女人自慰AV免费观看内涵网,日韩国产剧情在线观看网址,神马电影网特片网,最新一级电影欧美,在线观看亚洲欧美日韩,黄色视频在线播放免费观看,ABO涨奶期羡澄,第一导航fulione,美女主播操b

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

用這4招 優雅的實現Spring Boot異步線程間數據傳遞

jf_ro2CN3Fa ? 來源:碼猿技術專欄 ? 2023-01-30 10:40 ? 次閱讀

Spring Boot 自定義線程池實現異步開發相信看過陳某的文章都了解,但是在實際開發中需要在父子線程之間傳遞一些數據,比如用戶信息,鏈路信息等等

比如用戶登錄信息使用ThreadLocal存放保證線程隔離,代碼如下:

/**
*@description用戶上下文信息
*/
publicclassOauthContext{
privatestaticfinalThreadLocalloginValThreadLocal=newThreadLocal<>();

publicstaticLoginValget(){
returnloginValThreadLocal.get();
}
publicstaticvoidset(LoginValloginVal){
loginValThreadLocal.set(loginVal);
}
publicstaticvoidclear(){
loginValThreadLocal.remove();
}
}

那么子線程想要獲取這個LoginVal如何做呢?

今天就來介紹幾種優雅的方式實現Spring Boot 內部的父子線程的數據傳遞。

7bd85a70-9688-11ed-bfe3-dac502259ad0.png

1. 手動設置

每執行一次異步線程都要分為兩步:

獲取父線程的LoginVal

將LoginVal設置到子線程,達到復用

代碼如下:

publicvoidhandlerAsync(){
//1.獲取父線程的loginVal
LoginValloginVal=OauthContext.get();
log.info("父線程的值:{}",OauthContext.get());
CompletableFuture.runAsync(()->{
//2.設置子線程的值,復用
OauthContext.set(loginVal);
log.info("子線程的值:{}",OauthContext.get());
});
}

雖然能夠實現目的,但是每次開異步線程都需要手動設置,重復代碼太多,看了頭疼,你認為優雅嗎?

基于 Spring Boot + MyBatis Plus + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能

項目地址:https://github.com/YunaiV/ruoyi-vue-pro

2. 線程池設置TaskDecorator

TaskDecorator是什么?官方api的大致意思:這是一個執行回調方法的裝飾器,主要應用于傳遞上下文,或者提供任務的監控/統計信息。

知道有這么一個東西,如何去使用?

TaskDecorator是一個接口,首先需要去實現它,代碼如下:

/**
*@description上下文裝飾器
*/
publicclassContextTaskDecoratorimplementsTaskDecorator{
@Override
publicRunnabledecorate(Runnablerunnable){
//獲取父線程的loginVal
LoginValloginVal=OauthContext.get();
return()->{
try{
//將主線程的請求信息,設置到子線程中
OauthContext.set(loginVal);
//執行子線程,這一步不要忘了
runnable.run();
}finally{
//線程結束,清空這些信息,否則可能造成內存泄漏
OauthContext.clear();
}
};
}
}

這里我只是設置了LoginVal,實際開發中其他的共享數據,比如SecurityContext,RequestAttributes....

TaskDecorator需要結合線程池使用,實際開發中異步線程建議使用線程池,只需要在對應的線程池配置一下,代碼如下:

@Bean("taskExecutor")
publicThreadPoolTaskExecutortaskExecutor(){
ThreadPoolTaskExecutorpoolTaskExecutor=newThreadPoolTaskExecutor();
poolTaskExecutor.setCorePoolSize(xx);
poolTaskExecutor.setMaxPoolSize(xx);
//設置線程活躍時間(秒)
poolTaskExecutor.setKeepAliveSeconds(xx);
//設置隊列容量
poolTaskExecutor.setQueueCapacity(xx);
//設置TaskDecorator,用于解決父子線程間的數據復用
poolTaskExecutor.setTaskDecorator(newContextTaskDecorator());
poolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());
//等待所有任務結束后再關閉線程池
poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
returnpoolTaskExecutor;
}

此時業務代碼就不需要去設置子線程的值,直接使用即可,代碼如下:

publicvoidhandlerAsync(){
log.info("父線程的用戶信息:{}",OauthContext.get());
//執行異步任務,需要指定的線程池
CompletableFuture.runAsync(()->log.info("子線程的用戶信息:{}",OauthContext.get()),taskExecutor);
}

來看一下結果,如下圖:

7bee3aca-9688-11ed-bfe3-dac502259ad0.png

這里使用的是CompletableFuture執行異步任務,使用@Async這個注解同樣是可行的。

注意 :無論使用何種方式,都需要指定線程池

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能

項目地址:https://github.com/YunaiV/yudao-cloud

3. InheritableThreadLocal

這種方案不建議使用,InheritableThreadLocal雖然能夠實現父子線程間的復用,但是在線程池中使用會存在復用的問題。

這種方案使用也是非常簡單,直接用InheritableThreadLocal替換ThreadLocal即可,代碼如下:

/**
*@description用戶上下文信息
*/
publicclassOauthContext{
privatestaticfinalInheritableThreadLocalloginValThreadLocal=newInheritableThreadLocal<>();

publicstaticLoginValget(){
returnloginValThreadLocal.get();
}
publicstaticvoidset(LoginValloginVal){
loginValThreadLocal.set(loginVal);
}
publicstaticvoidclear(){
loginValThreadLocal.remove();
}
}

4. TransmittableThreadLocal

TransmittableThreadLocal是阿里開源的工具,彌補了InheritableThreadLocal的缺陷,在使用線程池等會池化復用線程的執行組件情況下,提供ThreadLocal值的傳遞功能,解決異步執行時上下文傳遞的問題。

使用起來也是非常簡單,添加依賴如下:


com.alibaba
transmittable-thread-local
2.14.2

OauthContext改造代碼如下:

/**
*@description用戶上下文信息
*/
publicclassOauthContext{
privatestaticfinalTransmittableThreadLocalloginValThreadLocal=newTransmittableThreadLocal<>();

publicstaticLoginValget(){
returnloginValThreadLocal.get();
}
publicstaticvoidset(LoginValloginVal){
loginValThreadLocal.set(loginVal);
}
publicstaticvoidclear(){
loginValThreadLocal.remove();
}
}

TransmittableThreadLocal原理

從定義來看,TransimittableThreadLocal繼承于InheritableThreadLocal,并實現TtlCopier接口,它里面只有一個copy方法。所以主要是對InheritableThreadLocal的擴展。

publicclassTransmittableThreadLocalextendsInheritableThreadLocalimplementsTtlCopier

在TransimittableThreadLocal中添加holder屬性。這個屬性的作用就是被標記為具備線程傳遞資格的對象都會被添加到這個對象中。

要標記一個類,比較容易想到的方式,就是給這個類新增一個Type字段,還有一個方法就是將具備這種類型的的對象都添加到一個靜態全局集合中。之后使用時,這個集合里的所有值都具備這個標記。

//1.holder本身是一個InheritableThreadLocal對象
//2.這個holder對象的value是WeakHashMap,?>
//2.1WeekHashMap的value總是null,且不可能被使用。
//2.2WeekHasshMap支持value=null
privatestaticInheritableThreadLocal,?>>holder=newInheritableThreadLocal,?>>(){
@Override
protectedWeakHashMap,?>initialValue(){
returnnewWeakHashMap,Object>();
}

/**
*重寫了childValue方法,實現上直接將父線程的屬性作為子線程的本地變量對象。
*/
@Override
protectedWeakHashMap,?>childValue(WeakHashMap,?>parentValue){
returnnewWeakHashMap,Object>(parentValue);
}
};

應用代碼是通過TtlExecutors工具類對線程池對象進行包裝。工具類只是簡單的判斷,輸入的線程池是否已經被包裝過、非空校驗等,然后返回包裝類ExecutorServiceTtlWrapper。根據不同的線程池類型,有不同和的包裝類。

@Nullable
publicstaticExecutorServicegetTtlExecutorService(@NullableExecutorServiceexecutorService){
if(TtlAgent.isTtlAgentLoaded()||executorService==null||executorServiceinstanceofTtlEnhanced){
returnexecutorService;
}
returnnewExecutorServiceTtlWrapper(executorService);
}

進入包裝類ExecutorServiceTtlWrapper。可以注意到不論是通過ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都會將線程對象包裝成TtlCallable或者TtlRunnable,用于在真正執行run方法前做一些業務邏輯。

/**
*在ExecutorServiceTtlWrapper實現submit方法
*/
@NonNull
@Override
publicFuturesubmit(@NonNullCallabletask){
returnexecutorService.submit(TtlCallable.get(task));
}

/**
*在ExecutorTtlWrapper實現execute方法
*/
@Override
publicvoidexecute(@NonNullRunnablecommand){
executor.execute(TtlRunnable.get(command));
}

所以,重點的核心邏輯應該是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable為例,TtlRunnable同理類似。在分析call()方法之前,先看一個類Transmitter

publicstaticclassTransmitter{
/**
*捕獲當前線程中的是所有TransimittableThreadLocal和注冊ThreadLocal的值。
*/
@NonNull
publicstaticObjectcapture(){
returnnewSnapshot(captureTtlValues(),captureThreadLocalValues());
}

/**
*捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。
*/
privatestaticHashMap,Object>captureTtlValues(){
HashMap,Object>ttl2Value=
newHashMap,Object>();
for(TransmittableThreadLocalthreadLocal:holder.get().keySet()){
ttl2Value.put(threadLocal,threadLocal.copyValue());
}
returnttl2Value;
}

/**
*捕獲注冊的ThreadLocal的值,也就是原本線程中的ThreadLocal,可以注冊到TTL中,在
*進行線程池本地變量傳遞時也會被傳遞。
*/
privatestaticHashMap,Object>captureThreadLocalValues(){
finalHashMap,Object>threadLocal2Value=
newHashMap,Object>();
for(Map.Entry,TtlCopier>entry:threadLocalHolder.entrySet()){
finalThreadLocalthreadLocal=entry.getKey();
finalTtlCopiercopier=entry.getValue();
threadLocal2Value.put(threadLocal,copier.copy(threadLocal.get()));
}
returnthreadLocal2Value;
}

/**
*將捕獲到的本地變量進行替換子線程的本地變量,并且返回子線程現有的本地變量副本backup。
*用于在執行run/call方法之后,將本地變量副本恢復。
*/
@NonNull
publicstaticObjectreplay(@NonNullObjectcaptured){
finalSnapshotcapturedSnapshot=(Snapshot)captured;
returnnewSnapshot(replayTtlValues(capturedSnapshot.ttl2Value),
replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}

/**
*替換TransmittableThreadLocal
*/
@NonNull
privatestaticHashMap,Object>replayTtlValues(@NonNullHashMap,Object>captured){
//創建副本backup
HashMap,Object>backup=
newHashMap,Object>();

for(finalIterator>iterator=holder.get().keySet().iterator();iterator.hasNext();){
TransmittableThreadLocalthreadLocal=iterator.next();
//對當前線程的本地變量進行副本拷貝
backup.put(threadLocal,threadLocal.get());

//若出現調用線程中不存在某個線程變量,而線程池中線程有,則刪除線程池中對應的本地變量
if(!captured.containsKey(threadLocal)){
iterator.remove();
threadLocal.superRemove();
}
}
//將捕獲的TTL值打入線程池獲取到的線程TTL中。
setTtlValuesTo(captured);
//是一個擴展點,調用TTL的beforeExecute方法。默認實現為空
doExecuteCallback(true);
returnbackup;
}

privatestaticHashMap,Object>replayThreadLocalValues(@NonNullHashMap,Object>captured){
finalHashMap,Object>backup=
newHashMap,Object>();
for(Map.Entry,Object>entry:captured.entrySet()){
finalThreadLocalthreadLocal=entry.getKey();
backup.put(threadLocal,threadLocal.get());
finalObjectvalue=entry.getValue();
if(value==threadLocalClearMark)threadLocal.remove();
elsethreadLocal.set(value);
}
returnbackup;
}

/**
*清除單線線程的所有TTL和TL,并返回清除之氣的backup
*/
@NonNull
publicstaticObjectclear(){
finalHashMap,Object>ttl2Value=
newHashMap,Object>();

finalHashMap,Object>threadLocal2Value=
newHashMap,Object>();
for(Map.Entry,TtlCopier>entry:threadLocalHolder.entrySet()){
finalThreadLocalthreadLocal=entry.getKey();
threadLocal2Value.put(threadLocal,threadLocalClearMark);
}
returnreplay(newSnapshot(ttl2Value,threadLocal2Value));
}

/**
*還原
*/
publicstaticvoidrestore(@NonNullObjectbackup){
finalSnapshotbackupSnapshot=(Snapshot)backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}

privatestaticvoidrestoreTtlValues(@NonNullHashMap,Object>backup){
//擴展點,調用TTL的afterExecute
doExecuteCallback(false);

for(finalIterator>iterator=holder.get().keySet().iterator();iterator.hasNext();){
TransmittableThreadLocalthreadLocal=iterator.next();

if(!backup.containsKey(threadLocal)){
iterator.remove();
threadLocal.superRemove();
}
}

//將本地變量恢復成備份版本
setTtlValuesTo(backup);
}

privatestaticvoidsetTtlValuesTo(@NonNullHashMap,Object>ttlValues){
for(Map.Entry,Object>entry:ttlValues.entrySet()){
TransmittableThreadLocalthreadLocal=entry.getKey();
threadLocal.set(entry.getValue());
}
}

privatestaticvoidrestoreThreadLocalValues(@NonNullHashMap,Object>backup){
for(Map.Entry,Object>entry:backup.entrySet()){
finalThreadLocalthreadLocal=entry.getKey();
threadLocal.set(entry.getValue());
}
}

/**
*快照類,保存TTL和TL
*/
privatestaticclassSnapshot{
finalHashMap,Object>ttl2Value;
finalHashMap,Object>threadLocal2Value;

privateSnapshot(HashMap,Object>ttl2Value,
HashMap,Object>threadLocal2Value){
this.ttl2Value=ttl2Value;
this.threadLocal2Value=threadLocal2Value;
}
}

進入TtlCallable#call()方法。

@Override
publicVcall()throwsException{
Objectcaptured=capturedRef.get();
if(captured==null||releaseTtlValueReferenceAfterCall&&
!capturedRef.compareAndSet(captured,null)){
thrownewIllegalStateException("TTLvaluereferenceisreleasedaftercall!");
}
//調用replay方法將捕獲到的當前線程的本地變量,傳遞給線程池線程的本地變量,
//并且獲取到線程池線程覆蓋之前的本地變量副本。
Objectbackup=replay(captured);
try{
//線程方法調用
returncallable.call();
}finally{
//使用副本進行恢復。
restore(backup);
}
}

到這基本上線程池方式傳遞本地變量的核心代碼已經大概看完了。總的來說在創建TtlCallable對象是,調用capture()方法捕獲調用方的本地線程變量,在call()執行時,將捕獲到的線程變量,替換到線程池所對應獲取到的線程的本地變量中,并且在執行完成之后,將其本地變量恢復到調用之前。

總結

上述列舉了4種方案,陳某這里推薦方案2和方案4,其中兩種方案的缺點非常明顯,實際開發中也是采用的方案2或者方案4。

審核編輯:湯梓紅
聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 接口
    +關注

    關注

    33

    文章

    8932

    瀏覽量

    153186
  • spring
    +關注

    關注

    0

    文章

    340

    瀏覽量

    14878
  • Boot
    +關注

    關注

    0

    文章

    153

    瀏覽量

    36539
  • 線程
    +關注

    關注

    0

    文章

    507

    瀏覽量

    20070
  • 數據傳遞
    +關注

    關注

    1

    文章

    3

    瀏覽量

    1801

原文標題:用這4招 優雅的實現Spring Boot 異步線程間數據傳遞

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦
    熱點推薦

    Spring Boot如何實現異步任務

    Spring Boot 提供了多種方式來實現異步任務,這里介紹三種主要實現方式。 1、基于注解 @Async @Async 注解是
    的頭像 發表于 09-30 10:32 ?1656次閱讀

    Spring Boot虛擬線程和Webflux性能對比

    早上看到一篇關于Spring Boot虛擬線程和Webflux性能對比的文章,覺得還不錯。內容較長,抓重點給大家介紹一下這篇文章的核心內容,方便大家快速閱讀。
    發表于 09-24 14:54 ?1177次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>虛擬<b class='flag-5'>線程</b>和Webflux性能對比

    LabVIEW多線程編程數據傳遞教程

    很多時候在一個VI的不同線程或者不同VI的不同線程中需要有一些交互——這些線程并不能完全獨立運行,需要一定的數據通信才能正確執行,這時就需要在編程時使用LabVIEW提供的
    的頭像 發表于 11-24 10:05 ?9205次閱讀
    LabVIEW多<b class='flag-5'>線程</b>編程<b class='flag-5'>數據傳遞</b>教程

    通過隊列實現vi之間數據傳遞

    `各位高手,請教下如何用隊列實現vi之間的數據傳遞,最好能給出個例子,我是初學者,謝謝`
    發表于 09-08 11:01

    請問C6678核間數據傳遞方式是什么?為什么是這樣?

    ,但是給的例子都是SYS/BIOS下面使用的。我想請問一下是否qmss,CPPI只能在操作系統下才能使用,沒有操作系統可以嗎?還有別的核間數據傳遞方式嗎?謝謝,請指教!
    發表于 06-19 02:42

    啟動Spring Boot項目應用的三種方法

    ,從而使開發人員不再需要定義樣板化的配置。我的話來理解,就是spring boot其實不是什么新的框架,它默認配置了很多框架的使用方式,就像maven整合了所有的jar包,spring
    發表于 01-14 17:33

    基于Spring Cloud和Euraka的優雅下線以及灰度發布

    該方式借助的是 Spring Boot 應用的 Shutdown hook,應用本身的下線也是優雅的,但如果你的服務發現組件使用的是 Eureka,那么默認最長會有 90 秒的延遲,其他應用才會感知到該服務下線
    的頭像 發表于 04-20 09:52 ?2151次閱讀

    Spring Boot Web相關的基礎知識

    Boot的第一個接口。接下來將會將會介紹使用Spring Boot開發Web應用的相關內容,其主要包括使用spring-boot-starter-web組件來
    的頭像 發表于 03-17 15:03 ?828次閱讀

    簡述Spring Boot數據校驗

    上一篇文章我們了解了Spring Boot Web相關的知識,初步了解了spring-boot-starter-web,還了解了@Contrler和@RestController的差別,如果
    的頭像 發表于 03-17 15:07 ?979次閱讀

    Spring Boot如何優雅實現數據加密存儲、模糊匹配和脫敏

    近來我們都在圍繞著使用Spring Boot開發業務系統時如何保證數據安全性這個主題展開總結,當下大部分的B/S架構的系統也都是基于Spring B
    的頭像 發表于 06-19 14:42 ?2396次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>如何<b class='flag-5'>優雅</b><b class='flag-5'>實現</b><b class='flag-5'>數據</b>加密存儲、模糊匹配和脫敏

    Spring Boot Actuator快速入門

    不知道大家在寫 Spring Boot 項目的過程中,使用過 Spring Boot Actuator 嗎?知道 Spring
    的頭像 發表于 10-09 17:11 ?820次閱讀

    Spring Boot啟動 Eureka流程

    在上篇中已經說過了 Eureka-Server 本質上是一個 web 應用的項目,今天就來看看 Spring Boot 是怎么啟動 Eureka 的。 Spring Boot 啟動 E
    的頭像 發表于 10-10 11:40 ?1129次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>啟動 Eureka流程

    Spring Boot的啟動原理

    可能很多初學者會比較困惑,Spring Boot 是如何做到將應用代碼和所有的依賴打包成一個獨立的 Jar 包,因為傳統的 Java 項目打包成 Jar 包之后,需要通過 -classpath 屬性
    的頭像 發表于 10-13 11:44 ?867次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b>的啟動原理

    Spring Boot 的設計目標

    什么是Spring Boot Spring BootSpring 開源組織下的一個子項目,也是 S
    的頭像 發表于 10-13 14:56 ?745次閱讀
    <b class='flag-5'>Spring</b> <b class='flag-5'>Boot</b> 的設計目標

    Spring Boot 3.2支持虛擬線程和原生鏡像

    Spring Boot 3.2 前幾日發布,讓我們 Java 21、GraalVM 和虛擬線程來嘗試一下。
    的頭像 發表于 11-30 16:22 ?935次閱讀