前言
很多時候,我們?yōu)榱颂嵘?a target="_blank">接口的性能,會把之前單線程同步執(zhí)行的代碼,改成多線程異步執(zhí)行。
比如:查詢用戶信息接口,需要返回用戶基本信息、積分信息、成長值信息,而用戶、積分和成長值,需要調(diào)用不同的接口獲取數(shù)據(jù)。
如果查詢用戶信息接口,同步調(diào)用三個接口獲取數(shù)據(jù),會非常耗時。
這就非常有必要把三個接口調(diào)用,改成異步調(diào)用,最后匯總結(jié)果。
再比如:注冊用戶接口,該接口主要包含:寫用戶表,分配權(quán)限,配置用戶導(dǎo)航頁,發(fā)通知消息等功能。
該用戶注冊接口包含的業(yè)務(wù)邏輯比較多,如果在接口中同步執(zhí)行這些代碼,該接口響應(yīng)時間會非常慢。
這時就需要把業(yè)務(wù)邏輯梳理一下,劃分:核心邏輯和非核心邏輯。這個例子中的核心邏輯是:寫用戶表和分配權(quán)限,非核心邏輯是:配置用戶導(dǎo)航頁和發(fā)通知消息。
顯然核心邏輯必須在接口中同步執(zhí)行,而非核心邏輯可以多線程異步執(zhí)行。
等等。
需要使用多線程的業(yè)務(wù)場景太多了,使用多線程異步執(zhí)行的好處不言而喻。
但我要說的是,如果多線程沒有使用好,它也會給我們帶來很多意想不到的問題,不信往后繼續(xù)看。
今天跟大家一起聊聊,代碼改成多線程調(diào)用之后,帶來的9大問題。
1.獲取不到返回值
如果你通過直接繼承Thread類,或者實(shí)現(xiàn)Runnable接口的方式去創(chuàng)建線程。
那么,恭喜你,你將沒法獲取該線程方法的返回值。
使用線程的場景有兩種:
不需要關(guān)注線程方法的返回值。
需要關(guān)注線程方法的返回值。
大部分業(yè)務(wù)場景是不需要關(guān)注線程方法返回值的,但如果我們有些業(yè)務(wù)需要關(guān)注線程方法的返回值該怎么處理呢?
查詢用戶信息接口,需要返回用戶基本信息、積分信息、成長值信息,而用戶、積分和成長值,需要調(diào)用不同的接口獲取數(shù)據(jù)。
如下圖所示:
在Java8之前可以通過實(shí)現(xiàn)Callable接口,獲取線程返回結(jié)果。
Java8以后通過CompleteFuture類實(shí)現(xiàn)該功能。我們這里以CompleteFuture為例:
publicUserInfogetUserInfo(Longid)throwsInterruptedException,ExecutionException{ finalUserInfouserInfo=newUserInfo(); CompletableFutureuserFuture=CompletableFuture.supplyAsync(()->{ getRemoteUserAndFill(id,userInfo); returnBoolean.TRUE; },executor); CompletableFuturebonusFuture=CompletableFuture.supplyAsync(()->{ getRemoteBonusAndFill(id,userInfo); returnBoolean.TRUE; },executor); CompletableFuturegrowthFuture=CompletableFuture.supplyAsync(()->{ getRemoteGrowthAndFill(id,userInfo); returnBoolean.TRUE; },executor); CompletableFuture.allOf(userFuture,bonusFuture,growthFuture).join(); userFuture.get(); bonusFuture.get(); growthFuture.get(); returnuserInfo; }
溫馨提醒一下,這兩種方式別忘了使用線程池。示例中我用到了executor,表示自定義的線程池,為了防止高并發(fā)場景下,出現(xiàn)線程過多的問題。
此外,F(xiàn)ork/join框架也提供了執(zhí)行任務(wù)并返回結(jié)果的能力。
2.數(shù)據(jù)丟失
我們還是以注冊用戶接口為例,該接口主要包含:寫用戶表,分配權(quán)限,配置用戶導(dǎo)航頁,發(fā)通知消息等功能。
其中:寫用戶表和分配權(quán)限功能,需要在一個事務(wù)中同步執(zhí)行。而剩余的配置用戶導(dǎo)航頁和發(fā)通知消息功能,使用多線程異步執(zhí)行。
表面上看起來沒問題。
但如果前面的寫用戶表和分配權(quán)限功能成功了,用戶注冊接口就直接返回成功了。
但如果后面異步執(zhí)行的配置用戶導(dǎo)航頁,或發(fā)通知消息功能失敗了,怎么辦?
如下圖所示:
該接口前面明明已經(jīng)提示用戶成功了,但結(jié)果后面又有一部分功能在多線程異步執(zhí)行中失敗了。
這時該如何處理呢?
沒錯,你可以做失敗重試。
但如果重試了一定的次數(shù),還是沒有成功,這條請求數(shù)據(jù)該如何處理呢?如果不做任何處理,該數(shù)據(jù)是不是就丟掉了?
為了防止數(shù)據(jù)丟失,可以用如下方案:
使用mq異步處理。在分配權(quán)限之后,發(fā)送一條mq消息,到mq服務(wù)器,然后在mq的消費(fèi)者中使用多線程,去配置用戶導(dǎo)航頁和發(fā)通知消息。如果mq消費(fèi)者中處理失敗了,可以自己重試。
使用job異步處理。在分配權(quán)限之后,往任務(wù)表中寫一條數(shù)據(jù)。然后有個job定時掃描該表,然后配置用戶導(dǎo)航頁和發(fā)通知消息。如果job處理某條數(shù)據(jù)失敗了,可以在表中記錄一個重試次數(shù),然后不斷重試。但該方案有個缺點(diǎn),就是實(shí)時性可能不太高。
3.順序問題
如果你使用了多線程,就必須接受一個非常現(xiàn)實(shí)的問題,即順序問題。
假如之前代碼的執(zhí)行順序是:a,b,c,改成多線程執(zhí)行之后,代碼的執(zhí)行順序可能變成了:a,c,b。(這個跟cpu調(diào)度算法有關(guān))
例如:
publicstaticvoidmain(String[]args){ Threadthread1=newThread(()->System.out.println("a")); Threadthread2=newThread(()->System.out.println("b")); Threadthread3=newThread(()->System.out.println("c")); thread1.start(); thread2.start(); thread3.start(); }
執(zhí)行結(jié)果:
a c b
那么,來自靈魂的一問:如何保證線程的順序呢?
即線程啟動的順序是:a,b,c,執(zhí)行的順序也是:a,b,c。
如下圖所示:
3.1 join
Thread類的join方法它會讓主線程等待子線程運(yùn)行結(jié)束后,才能繼續(xù)運(yùn)行。
列如:
publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadthread1=newThread(()->System.out.println("a")); Threadthread2=newThread(()->System.out.println("b")); Threadthread3=newThread(()->System.out.println("c")); thread1.start(); thread1.join(); thread2.start(); thread2.join(); thread3.start(); }
執(zhí)行結(jié)果永遠(yuǎn)都是:
a b c
3.2 newSingleThreadExecutor
我們可以使用JDK自帶的Excutors類的newSingleThreadExecutor方法,創(chuàng)建一個單線程的線程池。
例如:
publicstaticvoidmain(String[]args){ ExecutorServiceexecutorService=Executors.newSingleThreadExecutor(); Threadthread1=newThread(()->System.out.println("a")); Threadthread2=newThread(()->System.out.println("b")); Threadthread3=newThread(()->System.out.println("c")); executorService.submit(thread1); executorService.submit(thread2); executorService.submit(thread3); executorService.shutdown(); }
執(zhí)行結(jié)果永遠(yuǎn)都是:
a b c
使用Excutors類的newSingleThreadExecutor方法創(chuàng)建的單線程的線程池,使用了LinkedBlockingQueue作為隊(duì)列,而此隊(duì)列按 FIFO(先進(jìn)先出)排序元素。
添加到隊(duì)列的順序是a,b,c,則執(zhí)行的順序也是a,b,c。
3.3 CountDownLatch
CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。
例如:
publicclassThreadTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ CountDownLatchlatch1=newCountDownLatch(0); CountDownLatchlatch2=newCountDownLatch(1); CountDownLatchlatch3=newCountDownLatch(1); Threadthread1=newThread(newTestRunnable(latch1,latch2,"a")); Threadthread2=newThread(newTestRunnable(latch2,latch3,"b")); Threadthread3=newThread(newTestRunnable(latch3,latch3,"c")); thread1.start(); thread2.start(); thread3.start(); } } classTestRunnableimplementsRunnable{ privateCountDownLatchlatch1; privateCountDownLatchlatch2; privateStringmessage; TestRunnable(CountDownLatchlatch1,CountDownLatchlatch2,Stringmessage){ this.latch1=latch1; this.latch2=latch2; this.message=message; } @Override publicvoidrun(){ try{ latch1.await(); System.out.println(message); }catch(InterruptedExceptione){ e.printStackTrace(); } latch2.countDown(); } }
執(zhí)行結(jié)果永遠(yuǎn)都是:
a b c
此外,使用CompletableFuture的thenRun方法,也能多線程的執(zhí)行順序,在這里就不一一介紹了。
4.線程安全問題
既然使用了線程,伴隨而來的還會有線程安全問題。
假如現(xiàn)在有這樣一個需求:用多線程執(zhí)行查詢方法,然后把執(zhí)行結(jié)果添加到一個list集合中。
代碼如下:
Listlist=Lists.newArrayList(); dataList.stream() .map(data->CompletableFuture .supplyAsync(()->query(list,data),asyncExecutor) )); CompletableFuture.allOf(futureArray).join();
使用CompletableFuture異步多線程執(zhí)行query方法:
publicvoidquery(Listlist,UserEntitycondition){ Useruser=queryByCondition(condition); if(Objects.isNull(user)){ return; } list.add(user); UserExtenduserExtend=queryByOther(condition); if(Objects.nonNull(userExtend)){ user.setExtend(userExtend.getInfo()); } }
在query方法中,將獲取的查詢結(jié)果添加到list集合中。
結(jié)果list會出現(xiàn)線程安全問題,有時候會少數(shù)據(jù),當(dāng)然也不一定是必現(xiàn)的。
這是因?yàn)锳rrayList是非線程安全的,沒有使用synchronized等關(guān)鍵字修飾。
如何解決這個問題呢?
答:使用CopyOnWriteArrayList集合,代替普通的ArrayList集合,CopyOnWriteArrayList是一個線程安全的機(jī)會。
只需一行小小的改動即可:
ListlistLists.newCopyOnWriteArrayList();
溫馨的提醒一下,這里創(chuàng)建集合的方式,用了google的collect包。
5.ThreadLocal獲取數(shù)據(jù)異常
我們都知道JDK為了解決線程安全問題,提供了一種用空間換時間的新思路:ThreadLocal。
它的核心思想是:共享變量在每個線程都有一個副本,每個線程操作的都是自己的副本,對另外的線程沒有影響。
例如:
@Service publicclassThreadLocalService{ privatestaticfinalThreadLocalthreadLocal=newThreadLocal<>(); publicvoidadd(){ threadLocal.set(1); doSamething(); Integerinteger=threadLocal.get(); } }
ThreadLocal在普通中線程中,的確能夠獲取正確的數(shù)據(jù)。
但在真實(shí)的業(yè)務(wù)場景中,一般很少用單獨(dú)的線程,絕大多數(shù),都是用的線程池。
那么,在線程池中如何獲取ThreadLocal對象生成的數(shù)據(jù)呢?
如果直接使用普通ThreadLocal,顯然是獲取不到正確數(shù)據(jù)的。
我們先試試InheritableThreadLocal,具體代碼如下:
privatestaticvoidfun1(){ InheritableThreadLocalthreadLocal=newInheritableThreadLocal<>(); threadLocal.set(6); System.out.println("父線程獲取數(shù)據(jù):"+threadLocal.get()); ExecutorServiceexecutorService=Executors.newSingleThreadExecutor(); threadLocal.set(6); executorService.submit(()->{ System.out.println("第一次從線程池中獲取數(shù)據(jù):"+threadLocal.get()); }); threadLocal.set(7); executorService.submit(()->{ System.out.println("第二次從線程池中獲取數(shù)據(jù):"+threadLocal.get()); }); }
執(zhí)行結(jié)果:
父線程獲取數(shù)據(jù):6 第一次從線程池中獲取數(shù)據(jù):6 第二次從線程池中獲取數(shù)據(jù):6
由于這個例子中使用了單例線程池,固定線程數(shù)是1。
第一次submit任務(wù)的時候,該線程池會自動創(chuàng)建一個線程。因?yàn)槭褂昧薎nheritableThreadLocal,所以創(chuàng)建線程時,會調(diào)用它的init方法,將父線程中的inheritableThreadLocals數(shù)據(jù)復(fù)制到子線程中。所以我們看到,在主線程中將數(shù)據(jù)設(shè)置成6,第一次從線程池中獲取了正確的數(shù)據(jù)6。
之后,在主線程中又將數(shù)據(jù)改成7,但在第二次從線程池中獲取數(shù)據(jù)卻依然是6。
因?yàn)榈诙蝧ubmit任務(wù)的時候,線程池中已經(jīng)有一個線程了,就直接拿過來復(fù)用,不會再重新創(chuàng)建線程了。所以不會再調(diào)用線程的init方法,所以第二次其實(shí)沒有獲取到最新的數(shù)據(jù)7,還是獲取的老數(shù)據(jù)6。
那么,這該怎么辦呢?
答:使用TransmittableThreadLocal,它并非JDK自帶的類,而是阿里巴巴開源jar包中的類。
可以通過如下pom文件引入該jar包:
com.alibaba transmittable-thread-local 2.11.0 compile
代碼調(diào)整如下:
privatestaticvoidfun2()throwsException{ TransmittableThreadLocalthreadLocal=newTransmittableThreadLocal<>(); threadLocal.set(6); System.out.println("父線程獲取數(shù)據(jù):"+threadLocal.get()); ExecutorServicettlExecutorService=TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1)); threadLocal.set(6); ttlExecutorService.submit(()->{ System.out.println("第一次從線程池中獲取數(shù)據(jù):"+threadLocal.get()); }); threadLocal.set(7); ttlExecutorService.submit(()->{ System.out.println("第二次從線程池中獲取數(shù)據(jù):"+threadLocal.get()); }); }
執(zhí)行結(jié)果:
父線程獲取數(shù)據(jù):6 第一次從線程池中獲取數(shù)據(jù):6 第二次從線程池中獲取數(shù)據(jù):7
我們看到,使用了TransmittableThreadLocal之后,第二次從線程中也能正確獲取最新的數(shù)據(jù)7了。
nice。
如果你仔細(xì)觀察這個例子,你可能會發(fā)現(xiàn),代碼中除了使用TransmittableThreadLocal類之外,還使用了TtlExecutors.getTtlExecutorService方法,去創(chuàng)建ExecutorService對象。
這是非常重要的地方,如果沒有這一步,TransmittableThreadLocal在線程池中共享數(shù)據(jù)將不會起作用。
創(chuàng)建ExecutorService對象,底層的submit方法會TtlRunnable或TtlCallable對象。
以TtlRunnable類為例,它實(shí)現(xiàn)了Runnable接口,同時還實(shí)現(xiàn)了它的run方法:
publicvoidrun(){ Map,Object>copied=(Map)this.copiedRef.get(); if(copied!=null&&(!this.releaseTtlValueReferenceAfterRun||this.copiedRef.compareAndSet(copied,(Object)null))){ Mapbackup=TransmittableThreadLocal.backupAndSetToCopied(copied); try{ this.runnable.run(); }finally{ TransmittableThreadLocal.restoreBackup(backup); } }else{ thrownewIllegalStateException("TTLvaluereferenceisreleasedafterrun!"); } }
這段代碼的主要邏輯如下:
把當(dāng)時的ThreadLocal做個備份,然后將父類的ThreadLocal拷貝過來。
執(zhí)行真正的run方法,可以獲取到父類最新的ThreadLocal數(shù)據(jù)。
從備份的數(shù)據(jù)中,恢復(fù)當(dāng)時的ThreadLocal數(shù)據(jù)。
6.OOM問題
眾所周知,使用多線程可以提升代碼執(zhí)行效率,但也不是絕對的。
對于一些耗時的操作,使用多線程,確實(shí)可以提升代碼執(zhí)行效率。
但線程不是創(chuàng)建越多越好,如果線程創(chuàng)建多了,也可能會導(dǎo)致OOM異常。
例如:
Causedby: java.lang.OutOfMemoryError:unabletocreatenewnativethread
在JVM中創(chuàng)建一個線程,默認(rèn)需要占用1M的內(nèi)存空間。
如果創(chuàng)建了過多的線程,必然會導(dǎo)致內(nèi)存空間不足,從而出現(xiàn)OOM異常。
除此之外,如果使用線程池的話,特別是使用固定大小線程池,即使用Executors.newFixedThreadPool方法創(chuàng)建的線程池。
該線程池的核心線程數(shù)和最大線程數(shù)是一樣的,是一個固定值,而存放消息的隊(duì)列是LinkedBlockingQueue。
該隊(duì)列的最大容量是Integer.MAX_VALUE,也就是說如果使用固定大小線程池,存放了太多的任務(wù),有可能也會導(dǎo)致OOM異常。
java.lang.OutOfMemeryError:Javaheapspace
7.CPU使用率飆高
不知道你有沒有做過excel數(shù)據(jù)導(dǎo)入功能,需要將一批excel的數(shù)據(jù)導(dǎo)入到系統(tǒng)中。
每條數(shù)據(jù)都有些業(yè)務(wù)邏輯,如果單線程導(dǎo)入所有的數(shù)據(jù),導(dǎo)入效率會非常低。
于是改成了多線程導(dǎo)入。
如果excel中有大量的數(shù)據(jù),很可能會出現(xiàn)CPU使用率飆高的問題。
我們都知道,如果代碼出現(xiàn)死循環(huán),cpu使用率會飚的很多高。因?yàn)榇a一直在某個線程中循環(huán),沒法切換到其他線程,cpu一直被占用著,所以會導(dǎo)致cpu使用率一直高居不下。
而多線程導(dǎo)入大量的數(shù)據(jù),雖說沒有死循環(huán)代碼,但由于多個線程一直在不停的處理數(shù)據(jù),導(dǎo)致占用了cpu很長的時間。
也會出現(xiàn)cpu使用率很高的問題。
那么,如何解決這個問題呢?
答:使用Thread.sleep休眠一下。
在線程中處理完一條數(shù)據(jù),休眠10毫秒。
當(dāng)然CPU使用率飆高的原因很多,多線程處理數(shù)據(jù)和死循環(huán)只是其中兩種,還有比如:頻繁GC、正則匹配、頻繁序列化和反序列化等。
后面我會寫一篇介紹CPU使用率飆高的原因的專題文章,感興趣的小伙伴,可以關(guān)注一下我后續(xù)的文章。
8.事務(wù)問題
在實(shí)際項(xiàng)目開發(fā)中,多線程的使用場景還是挺多的。如果spring事務(wù)用在多線程場景中,會有問題嗎?
例如:
@Slf4j @Service publicclassUserService{ @Autowired privateUserMapperuserMapper; @Autowired privateRoleServiceroleService; @Transactional publicvoidadd(UserModeluserModel)throwsException{ userMapper.insertUser(userModel); newThread(()->{ roleService.doOtherThing(); }).start(); } } @Service publicclassRoleService{ @Transactional publicvoiddoOtherThing(){ System.out.println("保存role表數(shù)據(jù)"); } }
從上面的例子中,我們可以看到事務(wù)方法add中,調(diào)用了事務(wù)方法doOtherThing,但是事務(wù)方法doOtherThing是在另外一個線程中調(diào)用的。
這樣會導(dǎo)致兩個方法不在同一個線程中,獲取到的數(shù)據(jù)庫連接不一樣,從而是兩個不同的事務(wù)。如果想doOtherThing方法中拋了異常,add方法也回滾是不可能的。
如果看過spring事務(wù)源碼的朋友,可能會知道spring的事務(wù)是通過數(shù)據(jù)庫連接來實(shí)現(xiàn)的。當(dāng)前線程中保存了一個map,key是數(shù)據(jù)源,value是數(shù)據(jù)庫連接。
privatestaticfinalThreadLocal
我們說的同一個事務(wù),其實(shí)是指同一個數(shù)據(jù)庫連接,只有擁有同一個數(shù)據(jù)庫連接才能同時提交和回滾。如果在不同的線程,拿到的數(shù)據(jù)庫連接肯定是不一樣的,所以是不同的事務(wù)。
所以不要在事務(wù)中開啟另外的線程,去處理業(yè)務(wù)邏輯,這樣會導(dǎo)致事務(wù)失效。
9.導(dǎo)致服務(wù)掛掉
使用多線程會導(dǎo)致服務(wù)掛掉,這不是危言聳聽,而是確有其事。
假設(shè)現(xiàn)在有這樣一種業(yè)務(wù)場景:在mq的消費(fèi)者中需要調(diào)用訂單查詢接口,查到數(shù)據(jù)之后,寫入業(yè)務(wù)表中。
本來是沒啥問題的。
突然有一天,mq生產(chǎn)者跑了一個批量數(shù)據(jù)處理的job,導(dǎo)致mq服務(wù)器上堆積了大量的消息。
此時,mq消費(fèi)者的處理速度,遠(yuǎn)遠(yuǎn)跟不上mq消息的生產(chǎn)速度,導(dǎo)致的結(jié)果是出現(xiàn)了大量的消息堆積,對用戶有很大的影響。
為了解決這個問題,mq消費(fèi)者改成多線程處理,直接使用了線程池,并且最大線程數(shù)配置成了20。
這樣調(diào)整之后,消息堆積問題確實(shí)得到了解決。
但帶來了另外一個更嚴(yán)重的問題:訂單查詢接口并發(fā)量太大了,有點(diǎn)扛不住壓力,導(dǎo)致部分節(jié)點(diǎn)的服務(wù)直接掛掉。
為了解決問題,不得不臨時加服務(wù)節(jié)點(diǎn)。
在mq的消費(fèi)者中使用多線程,調(diào)用接口時,一定要評估好接口能夠承受的最大訪問量,防止因?yàn)閴毫^大,而導(dǎo)致服務(wù)掛掉的問題。
審核編輯:劉清
-
cpu
+關(guān)注
關(guān)注
68文章
11075瀏覽量
216974 -
服務(wù)器
+關(guān)注
關(guān)注
13文章
9786瀏覽量
87907 -
JAVA
+關(guān)注
關(guān)注
20文章
2989瀏覽量
109528
原文標(biāo)題:麻了,代碼改成多線程,竟有9大問題
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
多線程的安全注意事項(xiàng)
鴻蒙5開發(fā)寶藏案例分享---跨線程性能優(yōu)化指南
main線程的棧大小設(shè)置成2048的時候rt_memset導(dǎo)致hardfault,為什么?
工控一體機(jī)多線程任務(wù)調(diào)度優(yōu)化:聚徽分享破解工業(yè)復(fù)雜流程高效協(xié)同密碼
一種實(shí)時多線程VSLAM框架vS-Graphs介紹

請問如何在Python中實(shí)現(xiàn)多線程與多進(jìn)程的協(xié)作?
鴻蒙文件傳輸三方庫上線開源鴻蒙社區(qū) 十行代碼實(shí)現(xiàn)大文件高速傳輸
請問rt-thread studio如何進(jìn)行多線程編譯?
探索字節(jié)隊(duì)列的魔法:多類型支持、函數(shù)重載與線程安全

socket 多線程編程實(shí)現(xiàn)方法
Python中多線程和多進(jìn)程的區(qū)別

評論