Spring Boot 自定義線程池實現異步開發相信看過陳某的文章都了解,但是在實際開發中需要在父子線程之間傳遞一些數據,比如用戶信息,鏈路信息等等
比如用戶登錄信息使用ThreadLocal存放保證線程隔離,代碼如下:
/** *@description用戶上下文信息 */ publicclassOauthContext{ privatestaticfinalThreadLocalloginValThreadLocal=newThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
那么子線程想要獲取這個LoginVal如何做呢?
今天就來介紹幾種優雅的方式實現Spring Boot 內部的父子線程的數據傳遞。
1. 手動設置
每執行一次異步線程都要分為兩步:
獲取父線程的LoginVal
將LoginVal設置到子線程,達到復用
代碼如下:
publicvoidhandlerAsync(){ //1.獲取父線程的loginVal LoginValloginVal=OauthContext.get(); log.info("父線程的值:{}",OauthContext.get()); CompletableFuture.runAsync(()->{ //2.設置子線程的值,復用 OauthContext.set(loginVal); log.info("子線程的值:{}",OauthContext.get()); }); }
雖然能夠實現目的,但是每次開異步線程都需要手動設置,重復代碼太多,看了頭疼,你認為優雅嗎?
基于 Spring Boot + MyBatis Plus + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
項目地址:https://github.com/YunaiV/ruoyi-vue-pro
2. 線程池設置TaskDecorator
TaskDecorator是什么?官方api的大致意思:這是一個執行回調方法的裝飾器,主要應用于傳遞上下文,或者提供任務的監控/統計信息。
知道有這么一個東西,如何去使用?
TaskDecorator是一個接口,首先需要去實現它,代碼如下:
/** *@description上下文裝飾器 */ publicclassContextTaskDecoratorimplementsTaskDecorator{ @Override publicRunnabledecorate(Runnablerunnable){ //獲取父線程的loginVal LoginValloginVal=OauthContext.get(); return()->{ try{ //將主線程的請求信息,設置到子線程中 OauthContext.set(loginVal); //執行子線程,這一步不要忘了 runnable.run(); }finally{ //線程結束,清空這些信息,否則可能造成內存泄漏 OauthContext.clear(); } }; } }
這里我只是設置了LoginVal,實際開發中其他的共享數據,比如SecurityContext,RequestAttributes....
TaskDecorator需要結合線程池使用,實際開發中異步線程建議使用線程池,只需要在對應的線程池配置一下,代碼如下:
@Bean("taskExecutor") publicThreadPoolTaskExecutortaskExecutor(){ ThreadPoolTaskExecutorpoolTaskExecutor=newThreadPoolTaskExecutor(); poolTaskExecutor.setCorePoolSize(xx); poolTaskExecutor.setMaxPoolSize(xx); //設置線程活躍時間(秒) poolTaskExecutor.setKeepAliveSeconds(xx); //設置隊列容量 poolTaskExecutor.setQueueCapacity(xx); //設置TaskDecorator,用于解決父子線程間的數據復用 poolTaskExecutor.setTaskDecorator(newContextTaskDecorator()); poolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy()); //等待所有任務結束后再關閉線程池 poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); returnpoolTaskExecutor; }
此時業務代碼就不需要去設置子線程的值,直接使用即可,代碼如下:
publicvoidhandlerAsync(){ log.info("父線程的用戶信息:{}",OauthContext.get()); //執行異步任務,需要指定的線程池 CompletableFuture.runAsync(()->log.info("子線程的用戶信息:{}",OauthContext.get()),taskExecutor); }
來看一下結果,如下圖:
這里使用的是CompletableFuture執行異步任務,使用@Async這個注解同樣是可行的。
注意 :無論使用何種方式,都需要指定線程池
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
項目地址:https://github.com/YunaiV/yudao-cloud
3. InheritableThreadLocal
這種方案不建議使用,InheritableThreadLocal雖然能夠實現父子線程間的復用,但是在線程池中使用會存在復用的問題。
這種方案使用也是非常簡單,直接用InheritableThreadLocal替換ThreadLocal即可,代碼如下:
/** *@description用戶上下文信息 */ publicclassOauthContext{ privatestaticfinalInheritableThreadLocalloginValThreadLocal=newInheritableThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
4. TransmittableThreadLocal
TransmittableThreadLocal是阿里開源的工具,彌補了InheritableThreadLocal的缺陷,在使用線程池等會池化復用線程的執行組件情況下,提供ThreadLocal值的傳遞功能,解決異步執行時上下文傳遞的問題。
使用起來也是非常簡單,添加依賴如下:
com.alibaba transmittable-thread-local 2.14.2
OauthContext改造代碼如下:
/** *@description用戶上下文信息 */ publicclassOauthContext{ privatestaticfinalTransmittableThreadLocalloginValThreadLocal=newTransmittableThreadLocal<>(); publicstaticLoginValget(){ returnloginValThreadLocal.get(); } publicstaticvoidset(LoginValloginVal){ loginValThreadLocal.set(loginVal); } publicstaticvoidclear(){ loginValThreadLocal.remove(); } }
TransmittableThreadLocal原理
從定義來看,TransimittableThreadLocal繼承于InheritableThreadLocal,并實現TtlCopier接口,它里面只有一個copy方法。所以主要是對InheritableThreadLocal的擴展。
publicclassTransmittableThreadLocalextendsInheritableThreadLocal implementsTtlCopier
在TransimittableThreadLocal中添加holder屬性。這個屬性的作用就是被標記為具備線程傳遞資格的對象都會被添加到這個對象中。
要標記一個類,比較容易想到的方式,就是給這個類新增一個Type字段,還有一個方法就是將具備這種類型的的對象都添加到一個靜態全局集合中。之后使用時,這個集合里的所有值都具備這個標記。
//1.holder本身是一個InheritableThreadLocal對象 //2.這個holder對象的value是WeakHashMap,?> //2.1WeekHashMap的value總是null,且不可能被使用。 //2.2WeekHasshMap支持value=null privatestaticInheritableThreadLocal ,?>>holder=newInheritableThreadLocal ,?>>(){ @Override protectedWeakHashMap ,?>initialValue(){ returnnewWeakHashMap ,Object>(); } /** *重寫了childValue方法,實現上直接將父線程的屬性作為子線程的本地變量對象。 */ @Override protectedWeakHashMap ,?>childValue(WeakHashMap ,?>parentValue){ returnnewWeakHashMap ,Object>(parentValue); } };
應用代碼是通過TtlExecutors工具類對線程池對象進行包裝。工具類只是簡單的判斷,輸入的線程池是否已經被包裝過、非空校驗等,然后返回包裝類ExecutorServiceTtlWrapper。根據不同的線程池類型,有不同和的包裝類。
@Nullable publicstaticExecutorServicegetTtlExecutorService(@NullableExecutorServiceexecutorService){ if(TtlAgent.isTtlAgentLoaded()||executorService==null||executorServiceinstanceofTtlEnhanced){ returnexecutorService; } returnnewExecutorServiceTtlWrapper(executorService); }
進入包裝類ExecutorServiceTtlWrapper。可以注意到不論是通過ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都會將線程對象包裝成TtlCallable或者TtlRunnable,用于在真正執行run方法前做一些業務邏輯。
/** *在ExecutorServiceTtlWrapper實現submit方法 */ @NonNull @Override publicFuture submit(@NonNullCallable task){ returnexecutorService.submit(TtlCallable.get(task)); } /** *在ExecutorTtlWrapper實現execute方法 */ @Override publicvoidexecute(@NonNullRunnablecommand){ executor.execute(TtlRunnable.get(command)); }
所以,重點的核心邏輯應該是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable為例,TtlRunnable同理類似。在分析call()方法之前,先看一個類Transmitter
publicstaticclassTransmitter{ /** *捕獲當前線程中的是所有TransimittableThreadLocal和注冊ThreadLocal的值。 */ @NonNull publicstaticObjectcapture(){ returnnewSnapshot(captureTtlValues(),captureThreadLocalValues()); } /** *捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。 */ privatestaticHashMap,Object>captureTtlValues(){ HashMap ,Object>ttl2Value= newHashMap ,Object>(); for(TransmittableThreadLocal
進入TtlCallable#call()方法。
@Override publicVcall()throwsException{ Objectcaptured=capturedRef.get(); if(captured==null||releaseTtlValueReferenceAfterCall&& !capturedRef.compareAndSet(captured,null)){ thrownewIllegalStateException("TTLvaluereferenceisreleasedaftercall!"); } //調用replay方法將捕獲到的當前線程的本地變量,傳遞給線程池線程的本地變量, //并且獲取到線程池線程覆蓋之前的本地變量副本。 Objectbackup=replay(captured); try{ //線程方法調用 returncallable.call(); }finally{ //使用副本進行恢復。 restore(backup); } }
到這基本上線程池方式傳遞本地變量的核心代碼已經大概看完了。總的來說在創建TtlCallable對象是,調用capture()方法捕獲調用方的本地線程變量,在call()執行時,將捕獲到的線程變量,替換到線程池所對應獲取到的線程的本地變量中,并且在執行完成之后,將其本地變量恢復到調用之前。
總結
上述列舉了4種方案,陳某這里推薦方案2和方案4,其中兩種方案的缺點非常明顯,實際開發中也是采用的方案2或者方案4。
-
接口
+關注
關注
33文章
8932瀏覽量
153186 -
spring
+關注
關注
0文章
340瀏覽量
14878 -
Boot
+關注
關注
0文章
153瀏覽量
36539 -
線程
+關注
關注
0文章
507瀏覽量
20070 -
數據傳遞
+關注
關注
1文章
3瀏覽量
1801
原文標題:用這4招 優雅的實現Spring Boot 異步線程間數據傳遞
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
Spring Boot如何實現異步任務
Spring Boot虛擬線程和Webflux性能對比

LabVIEW多線程編程數據傳遞教程

請問C6678核間數據傳遞方式是什么?為什么是這樣?
啟動Spring Boot項目應用的三種方法
基于Spring Cloud和Euraka的優雅下線以及灰度發布
Spring Boot Web相關的基礎知識
簡述Spring Boot數據校驗
Spring Boot如何優雅實現數據加密存儲、模糊匹配和脫敏

Spring Boot Actuator快速入門
Spring Boot啟動 Eureka流程

Spring Boot的啟動原理

評論