最近一直暢遊在RocketMQ的源碼中,發現在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家來聊一聊JDK1.8提供的異步神器CompletableFuture,並且最後會結合RocketMQ源碼分析一下CompletableFuture的使用。
Future接口以及它的局限性我們都知道,Java中創建線程的方式主要有兩種方式,繼承Thread或者實現Runnable接口。但是這兩種都是有一個共同的缺點,那就是都無法獲取到線程執行的結果,也就是沒有返回值。於是在JDK1.5 以後為了解決這種沒有返回值的問題,提供了Callable和Future接口以及Future對應的實現類FutureTask,通過FutureTask的就可以獲取到異步執行的結果。
於是乎,我們想要開啟異步線程,執行任務,獲取結果,就可以這麼實現。
FutureTask<String>futureTask=newFutureTask<>(()->"三友");newThread(futureTask).start();System.out.println(futureTask.get());或者使用線程池的方式
ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();Future<String>future=executorService.submit(()->"三友");System.out.println(future.get());executorService.shutdown();線程池底層也是將提交的Callable的實現先封裝成FutureTask,然後通過execute方法來提交任務,執行異步邏輯。
Future接口的局限性雖然通過Future接口的get方法可以獲取任務異步執行的結果,但是get方法會阻塞主線程,也就是異步任務沒有完成,主線程會一直阻塞,直到任務結束。
Future也提供了isDone方法來查看異步線程任務執行是否完成,如果完成,就可以獲取任務的執行結果,代碼如下。
ExecutorServiceexecutorService=Executors.newSingleThreadExecutor();Future<String>future=executorService.submit(()->"三友");while(!future.isDone()){//任務有沒有完成,沒有就繼續循環判斷}System.out.println(future.get());executorService.shutdown();但是這種輪詢查看異步線程任務執行狀態,也是非常消耗cpu資源。
同時對於一些複雜的異步操作任務的處理,可能需要各種同步組件來一起完成。
所以,通過上面的介紹可以看出,Future在使用的過程中還是有很強的局限性,所以為了解決這種局限性,在JDK1.8的時候,Doug Lea 大神為我們提供了一種更為強大的類CompletableFuture。
什麼是CompletableFuture?CompletableFuture在JDK1.8提供了一種更加強大的異步編程的api。它實現了Future接口,也就是Future的功能特性CompletableFuture也有;除此之外,它也實現了CompletionStage接口,CompletionStage接口定義了任務編排的方法,執行某一階段,可以向下執行後續階段。
CompletableFuture相比於Future最大的改進就是提供了類似觀察者模式的回調監聽的功能,也就是當上一階段任務執行結束之後,可以回調你指定的下一階段任務,而不需要阻塞獲取結果之後來處理結果。
CompletableFuture常見api詳解CompletableFuture的方法api多,但主要可以分為以下幾類。
1、實例化CompletableFuture構造方法創建CompletableFuture<String>completableFuture=newCompletableFuture<>();System.out.println(completableFuture.get());此時如果有其它線程執行如下代碼,就能執行打印出 三友
completableFuture.complete("三友")靜態方法創建除了使用構造方法構造,CompletableFuture還提供了靜態方法來創建
publicstatic<U>CompletableFuture<U>supplyAsync(Supplier<U>supplier);publicstatic<U>CompletableFuture<U>supplyAsync(Supplier<U>supplier,Executorexecutor);publicstaticCompletableFuture<Void>runAsync(Runnablerunnable);publicstaticCompletableFuture<Void>runAsync(Runnablerunnable,Executorexecutor);supply 和 run 的主要區別就是 supply 可以有返回值,run 沒有返回值。至於另一個參數Executor 就是用來執行異步任務的線程池,如果不傳Executor 的話,默認是ForkJoinPool這個線程池的實現。
一旦通過靜態方法來構造,會立馬開啟異步線程執行Supplier或者Runnable提交的任務。
CompletableFuture<String>completableFuture=CompletableFuture.supplyAsync(()->"三友");System.out.println(completableFuture.get());一旦任務執行完成,就可以打印返回值,這裡的使用方法跟Future是一樣的。
所以對比兩個兩種實例化的方法,使用靜態方法的和使用構造方法主要區別就是,使用構造方法需要其它線程主動調用complete來表示任務執行完成,因為很簡單,因為在構造的時候沒有執行異步的任務,所以需要其它線程主動調用complete來表示任務執行完成。
2、獲取任務執行結果publicTget();publicTget(longtimeout,TimeUnitunit);publicTgetNow(TvalueIfAbsent);publicTjoin();get()和get(long timeout, TimeUnit unit)是實現了Future接口的功能,兩者主要區別就是get()會一直阻塞直到獲取到結果,get(long timeout, TimeUnit unit)值可以指定超時時間,當到了指定的時間還未獲取到任務,就會拋出TimeoutException異常。
getNow(T valueIfAbsent):就是獲取任務的執行結果,但不會產生阻塞。如果任務還沒執行完成,那麼就會返回你傳入的 valueIfAbsent 參數值,如果執行完成了,就會返回任務執行的結果。
join():跟get()的主要區別就是,get()會拋出檢查時異常,join()不會。
3、主動觸發任務完成publicbooleancomplete(Tvalue);publicbooleancompleteExceptionally(Throwableex);complete:主動觸發當前異步任務的完成。調用此方法時如果你的任務已經完成,那麼方法就會返回false;如果任務沒完成,就會返回true,並且其它線程獲取到的任務的結果就是complete的參數值。
completeExceptionally:跟complete的作用差不多,complete是正常結束任務,返回結果,而completeExceptionally就是觸發任務執行的異常。
4、對任務執行結果進行下一步處理只能接收任務正常執行後的回調public<U>CompletionStage<U>thenApply(Function<?superT,?extendsU>fn);publicCompletableFuture<Void>thenRun(Runnableaction);publicCompletionStage<Void>thenAccept(Consumer<?superT>action);這類回調的特點就是,當任務正常執行完成,沒有異常的時候就會回調。
thenApply:可以拿到上一步任務執行的結果進行處理,並且返回處理的結果 thenRun:拿不到上一步任務執行的結果,但會執行Runnable接口的實現 thenAccept:可以拿到上一步任務執行的結果進行處理,但不需要返回處理的結果
thenApply示例:
CompletableFuture<String>completableFuture=CompletableFuture.supplyAsync(()->10).thenApply(v->("上一步的執行的結果為:"+v));System.out.println(completableFuture.join());執行結果:
上一步的執行的結果為:10thenRun示例:
CompletableFuture<Void>completableFuture=CompletableFuture.supplyAsync(()->10).thenRun(()->System.out.println("上一步執行完成"));執行結果:
上一步執行完成thenAccept示例:
CompletableFuture<Void>completableFuture=CompletableFuture.supplyAsync(()->10).thenAccept(v->System.out.println("上一步執行完成,結果為:"+v));執行結果:
上一步執行完成,結果為:10thenApply有異常示例:
CompletableFuture<String>completableFuture=CompletableFuture.supplyAsync(()->{//模擬異常inti=1/0;return10;}).thenApply(v->("上一步的執行的結果為:"+v));System.out.println(completableFuture.join());執行結果:
Exceptioninthread"main"java.util.concurrent.CompletionException:java.lang.ArithmeticException:/byzeroatjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)atjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)atjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)當有異常時是不會回調的
只能接收任務處理異常後的回調publicCompletionStage<T>exceptionally(Function<Throwable,?extendsT>fn);當上面的任務執行過程中出現異常的時候,會回調exceptionally方法指定的回調,但是如果沒有出現異常,是不會回調的。
exceptionally能夠將異常給吞了,並且fn的返回值會返回回去。
其實這個exceptionally方法有點像降級的味道。當出現異常的時候,走到這個回調,可以返回一個默認值回去。
沒有異常情況下:
CompletableFuture<Integer>completableFuture=CompletableFuture.supplyAsync(()->{return100;}).exceptionally(e->{System.out.println("出現異常了,返回默認值");return110;});System.out.println(completableFuture.join());執行結果:
100有異常情況下:
CompletableFuture<Integer>completableFuture=CompletableFuture.supplyAsync(()->{inti=1/0;return100;}).exceptionally(e->{System.out.println("出現異常了,返回默認值");return110;});System.out.println(completableFuture.join());執行結果:
出現異常了,返回默認值110能同時接收任務執行正常和異常的回調public<U>CompletionStage<U>handle(BiFunction<?superT,Throwable,?extendsU>fn);publicCompletionStage<T>whenComplete(BiConsumer<?superT,?superThrowable>actin);不論前面的任務執行成功還是失敗都會回調的這類方法指定的回調方法。
handle : 跟exceptionally有點像,但是exceptionally是出現異常才會回調,兩者都有返回值,都能吞了異常,但是handle正常情況下也能回調。
whenComplete:能接受正常或者異常的回調,並且不影響上個階段的返回值,也就是主線程能獲取到上個階段的返回值;當出現異常時,whenComplete並不能吞了這個異常,也就是說主線程在獲取執行異常任務的結果時,會拋出異常。
這裡演示一下whenComplete處理異常示例情況,handle跟exceptionally對異常的處理差不多。
whenComplete處理異常示例:
CompletableFuture<Integer>completableFuture=CompletableFuture.supplyAsync(()->{inti=1/0;return10;}).whenComplete((r,e)->{System.out.println("whenComplete被調用了");});System.out.println(completableFuture.join());執行結果:
whenComplete被調用了Exceptioninthread"main"java.util.concurrent.CompletionException:java.lang.ArithmeticException:/byzeroatjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)atjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)atjava.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)5、對任務結果進行合併public<U,V>CompletionStage<V>thenCombine(CompletionStage<?extendsU>other,BiFunction<?superT,?superU,?extendsV>fn);這個方法的意思是,當前任務和other任務都執行結束後,拿到這兩個任務的執行結果,回調 BiFunction ,然後返回新的結果。
thenCombine的例子請往下繼續看。
6、以Async結尾的方法上面說的一些方法,比如說thenAccept方法,他有兩個對應的Async結尾的方法,如下:
publicCompletionStage<Void>thenAcceptAsync(Consumer<?superT>action,Executorexecutor);publicCompletionStage<Void>thenAcceptAsync(Consumer<?superT>action);thenAcceptAsync跟thenAccept的主要區別就是thenAcceptAsync會重新開一個線程來執行下一階段的任務,而thenAccept還是用上一階段任務執行的線程執行。
兩個thenAcceptAsync主要區別就是一個使用默認的線程池來執行任務,也就是ForkJoinPool,一個是使用方法參數傳入的線程池來執行任務。
當然除了thenAccept方法之外,上述提到的方法還有很多帶有Async結尾的對應的方法,他們的主要區別就是執行任務是否開啟異步線程來執行的區別。
當然,還有一些其它的api,可以自行查看
CompletableFuture在RocketMQ中的使用CompletableFuture在RocketMQ中的使用場景比較多,這裡我舉一個消息存儲的場景。
在RocketMQ中,Broker接收到生產者產生的消息的時候,會將消息持久化到磁盤和同步到從節點中。持久化到磁盤和消息同步到從節點是兩個獨立的任務,互不干擾,可以相互獨立執行。當消息持久化到磁盤和同步到從節點中任務完成之後,需要統計整個存儲消息消耗的時間,所以統計整個存儲消息消耗的時間是依賴前面兩個任務的完成。

實現代碼如下
消息存儲刷盤任務和主從複製任務:
PutMessageResultputMessageResult=newPutMessageResult(PutMessageStatus.PUT_OK,result);//提交刷盤的請求CompletableFuture<PutMessageStatus>flushResultFuture=submitFlushRequest(result,msg);//提交主從複製的請求CompletableFuture<PutMessageStatus>replicaResultFuture=submitReplicaRequest(result,msg);//刷盤和主從複製兩個異步任務通過thenCombine聯合returnflushResultFuture.thenCombine(replicaResultFuture,(flushStatus,replicaStatus)->{//當兩個刷盤和主從複製任務都完成的時候,就會回調//如果刷盤沒有成功,那麼就將消息存儲的狀態設置為失敗if(flushStatus!=PutMessageStatus.PUT_OK){putMessageResult.setPutMessageStatus(flushStatus);}//如果主從複製沒有成功,那麼就將消息存儲的狀態設置為失敗if(replicaStatus!=PutMessageStatus.PUT_OK){putMessageResult.setPutMessageStatus(replicaStatus);}//最終返回消息存儲的結果returnputMessageResult;});對上面兩個合併的任務執行結果通過thenAccept方法進行監聽,統計消息存儲的耗時:
//消息存儲的開始時間longbeginTime=this.getSystemClock().now();//存儲消息,然後返回 CompletableFuture,也就是上面一段代碼得返回值CompletableFuture<PutMessageResult>putResultFuture=this.commitLog.asyncPutMessage(msg);//監聽消息存儲的結果putResultFuture.thenAccept((result)->{//消息存儲完成之後會回調longelapsedTime=this.getSystemClock().now()-beginTime;if(elapsedTime>500){log.warn("putMessagenotinlockelapsedtime(ms)={},bodyLength={}",elapsedTime,msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if(null==result||!result.isOk()){this.storeStatsService.getPutMessageFailedTimes().add(1);}});CompletableFuture的優點1、異步函數式編程,實現優雅,易於維護;
2、它提供了異常管理的機制,讓你有機會拋出、管理異步任務執行中發生的異常,監聽這些異常的發生;
3、擁有對任務編排的能力。藉助這項能力,可以輕鬆地組織不同任務的運行順序、規則以及方式。
參考:
[1]https://zhuanlan.zhihu.com/p/344431341