背景介紹
1,最近有一個(gè)大數(shù)據(jù)量插入的操作入庫(kù)的業(yè)務(wù)場(chǎng)景,需要先做一些其他修改操作,然后在執(zhí)行插入操作,由于插入數(shù)據(jù)可能會(huì)很多,用到多線程去拆分?jǐn)?shù)據(jù)并行處理來(lái)提高響應(yīng)時(shí)間,如果有一個(gè)線程執(zhí)行失敗,則全部回滾。
2,在spring中可以使用@Transactional
注解去控制事務(wù),使出現(xiàn)異常時(shí)會(huì)進(jìn)行回滾,在多線程中,這個(gè)注解則不會(huì)生效,如果主線程需要先執(zhí)行一些修改數(shù)據(jù)庫(kù)的操作,當(dāng)子線程在進(jìn)行處理出現(xiàn)異常時(shí),主線程修改的數(shù)據(jù)則不會(huì)回滾,導(dǎo)致數(shù)據(jù)錯(cuò)誤。
3,下面用一個(gè)簡(jiǎn)單示例演示多線程事務(wù)。
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶(hù)小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶(hù)、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
- 項(xiàng)目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 視頻教程:https://doc.iocoder.cn/video/
公用的類(lèi)和方法
/**
*平均拆分list方法.
*@paramsource
*@paramn
*@param
*@return
*/
publicstaticList>averageAssign(Listsource,intn){
List>result=newArrayList>();
intremaider=source.size()%n;
intnumber=source.size()/n;
intoffset=0;//偏移量
for(inti=0;ivalue=null;
if(remaider>0){
value=source.subList(i*number+offset,(i+1)*number+offset+1);
remaider--;
offset++;
}else{
value=source.subList(i*number+offset,(i+1)*number+offset);
}
result.add(value);
}
returnresult;
}
/**線程池配置
*@versionV1.0
*/
publicclassExecutorConfig{
privatestaticintmaxPoolSize=Runtime.getRuntime().availableProcessors();
privatevolatilestaticExecutorServiceexecutorService;
publicstaticExecutorServicegetThreadPool(){
if(executorService==null){
synchronized(ExecutorConfig.class){
if(executorService==null){
executorService=newThreadPool();
}
}
}
returnexecutorService;
}
privatestaticExecutorServicenewThreadPool(){
intqueueSize=500;
intcorePool=Math.min(5,maxPoolSize);
returnnewThreadPoolExecutor(corePool,maxPoolSize,10000L,TimeUnit.MILLISECONDS,
newLinkedBlockingQueue<>(queueSize),newThreadPoolExecutor.AbortPolicy());
}
privateExecutorConfig(){}
}
/**獲取sqlSession
*@author86182
*@versionV1.0
*/
@Component
publicclassSqlContext{
@Resource
privateSqlSessionTemplatesqlSessionTemplate;
publicSqlSessiongetSqlSession(){
SqlSessionFactorysqlSessionFactory=sqlSessionTemplate.getSqlSessionFactory();
returnsqlSessionFactory.openSession();
}
}
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶(hù)小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶(hù)、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
示例事務(wù)不成功操作
/**
*測(cè)試多線程事務(wù).
*@paramemployeeDOList
*/
@Override
@Transactional
publicvoidsaveThread(ListemployeeDOList) {
try{
//先做刪除操作,如果子線程出現(xiàn)異常,此操作不會(huì)回滾
this.getBaseMapper().delete(null);
//獲取線程池
ExecutorServiceservice=ExecutorConfig.getThreadPool();
//拆分?jǐn)?shù)據(jù),拆分5份
List>lists=averageAssign(employeeDOList,5);
//執(zhí)行的線程
Thread[]threadArray=newThread[lists.size()];
//監(jiān)控子線程執(zhí)行完畢,再執(zhí)行主線程,要不然會(huì)導(dǎo)致主線程關(guān)閉,子線程也會(huì)隨著關(guān)閉
CountDownLatchcountDownLatch=newCountDownLatch(lists.size());
AtomicBooleanatomicBoolean=newAtomicBoolean(true);
for(inti=0;iif(i==lists.size()-1){
atomicBoolean.set(false);
}
Listlist=lists.get(i);
threadArray[i]=newThread(()->{
try{
//最后一個(gè)線程拋出異常
if(!atomicBoolean.get()){
thrownewServiceException("001","出現(xiàn)異常");
}
//批量添加,mybatisPlus中自帶的batch方法
this.saveBatch(list);
}finally{
countDownLatch.countDown();
}
});
}
for(inti=0;i//當(dāng)子線程執(zhí)行完畢時(shí),主線程再往下執(zhí)行
countDownLatch.await();
System.out.println("添加完畢");
}catch(Exceptione){
log.info("error",e);
thrownewServiceException("002","出現(xiàn)異常");
}finally{
connection.close();
}
}
數(shù)據(jù)庫(kù)中存在一條數(shù)據(jù):

//測(cè)試用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes={ThreadTest01.class,MainApplication.class})
publicclassThreadTest01{
@Resource
privateEmployeeBOemployeeBO;
/**
*測(cè)試多線程事務(wù).
*@throwsInterruptedException
*/
@Test
publicvoidMoreThreadTest2()throwsInterruptedException{
intsize=10;
ListemployeeDOList=newArrayList<>(size);
for(inti=0;inewEmployeeDO();
employeeDO.setEmployeeName("lol"+i);
employeeDO.setAge(18);
employeeDO.setGender(1);
employeeDO.setIdNumber(i+"XX");
employeeDO.setCreatTime(Calendar.getInstance().getTime());
employeeDOList.add(employeeDO);
}
try{
employeeBO.saveThread(employeeDOList);
System.out.println("添加成功");
}catch(Exceptione){
e.printStackTrace();
}
}
}
測(cè)試結(jié)果:


可以發(fā)現(xiàn)子線程組執(zhí)行時(shí),有一個(gè)線程執(zhí)行失敗,其他線程也會(huì)拋出異常,但是主線程中執(zhí)行的刪除操作,沒(méi)有回滾,@Transactional
注解沒(méi)有生效。
使用sqlSession
控制手動(dòng)提交事務(wù)
@Resource
SqlContextsqlContext;
/**
*測(cè)試多線程事務(wù).
*@paramemployeeDOList
*/
@Override
publicvoidsaveThread(ListemployeeDOList) throwsSQLException{
//獲取數(shù)據(jù)庫(kù)連接,獲取會(huì)話(內(nèi)部自有事務(wù))
SqlSessionsqlSession=sqlContext.getSqlSession();
Connectionconnection=sqlSession.getConnection();
try{
//設(shè)置手動(dòng)提交
connection.setAutoCommit(false);
//獲取mapper
EmployeeMapperemployeeMapper=sqlSession.getMapper(EmployeeMapper.class);
//先做刪除操作
employeeMapper.delete(null);
//獲取執(zhí)行器
ExecutorServiceservice=ExecutorConfig.getThreadPool();
List>callableList=newArrayList<>();
//拆分list
List>lists=averageAssign(employeeDOList,5);
AtomicBooleanatomicBoolean=newAtomicBoolean(true);
for(inti=0;iif(i==lists.size()-1){
atomicBoolean.set(false);
}
Listlist=lists.get(i);
//使用返回結(jié)果的callable去執(zhí)行,
Callablecallable=()->{
//讓最后一個(gè)線程拋出異常
if(!atomicBoolean.get()){
thrownewServiceException("001","出現(xiàn)異常");
}
returnemployeeMapper.saveBatch(list);
};
callableList.add(callable);
}
//執(zhí)行子線程
List>futures=service.invokeAll(callableList);
for(Futurefuture:futures){
//如果有一個(gè)執(zhí)行不成功,則全部回滾
if(future.get()<=0){
connection.rollback();
return;
}
}
connection.commit();
System.out.println("添加完畢");
}catch(Exceptione){
connection.rollback();
log.info("error",e);
thrownewServiceException("002","出現(xiàn)異常");
}finally{
connection.close();
}
}
//sql
"saveBatch"parameterType="List">
INSERTINTO
employee(employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
values
="list"item="item"index="index"separator=",">
(
#{item.employeeId},
#{item.age},
#{item.employeeName},
#{item.birthDate},
#{item.gender},
#{item.idNumber},
#{item.creatTime},
#{item.updateTime},
#{item.status}
)
數(shù)據(jù)庫(kù)中一條數(shù)據(jù):

測(cè)試結(jié)果:拋出異常,

刪除操作的數(shù)據(jù)回滾了,數(shù)據(jù)庫(kù)中的數(shù)據(jù)依舊存在,說(shuō)明事務(wù)成功了。

成功操作示例:
@Resource
SqlContextsqlContext;
/**
*測(cè)試多線程事務(wù).
*@paramemployeeDOList
*/
@Override
publicvoidsaveThread(ListemployeeDOList) throwsSQLException{
//獲取數(shù)據(jù)庫(kù)連接,獲取會(huì)話(內(nèi)部自有事務(wù))
SqlSessionsqlSession=sqlContext.getSqlSession();
Connectionconnection=sqlSession.getConnection();
try{
//設(shè)置手動(dòng)提交
connection.setAutoCommit(false);
EmployeeMapperemployeeMapper=sqlSession.getMapper(EmployeeMapper.class);
//先做刪除操作
employeeMapper.delete(null);
ExecutorServiceservice=ExecutorConfig.getThreadPool();
List>callableList=newArrayList<>();
List>lists=averageAssign(employeeDOList,5);
for(inti=0;ilist=lists.get(i);
Callablecallable=()->employeeMapper.saveBatch(list);
callableList.add(callable);
}
//執(zhí)行子線程
List>futures=service.invokeAll(callableList);
for(Futurefuture:futures){
if(future.get()<=0){
connection.rollback();
return;
}
}
connection.commit();
System.out.println("添加完畢");
}catch(Exceptione){
connection.rollback();
log.info("error",e);
thrownewServiceException("002","出現(xiàn)異常");
//thrownewServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
}
}
測(cè)試結(jié)果:

數(shù)據(jù)庫(kù)中數(shù)據(jù):
刪除的刪除了,添加的添加成功了,測(cè)試成功。

審核編輯 :李倩
-
多線程
+關(guān)注
關(guān)注
0文章
279瀏覽量
20308 -
spring
+關(guān)注
關(guān)注
0文章
340瀏覽量
14879
原文標(biāo)題:支付寶:多線程事務(wù)怎么回滾?說(shuō)用 @Transactional 可以回去等通知了!
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
復(fù)旦微電子與支付寶推出“碰一下”射頻芯片
請(qǐng)問(wèn)rt-thread studio如何進(jìn)行多線程編譯?
支付寶發(fā)布新一代AI視覺(jué)搜索“探一下”
socket 多線程編程實(shí)現(xiàn)方法
支付寶與華為終端達(dá)成戰(zhàn)略合作
eBay攜手螞蟻國(guó)際旗下Antom,支付寶成eBay新支付選項(xiàng)
支付寶與華為終端達(dá)成戰(zhàn)略合作,共同推動(dòng)移動(dòng)支付進(jìn)入“碰時(shí)代”
Spring事務(wù)實(shí)現(xiàn)原理

Python中多線程和多進(jìn)程的區(qū)別

復(fù)旦微電 Boost Tag 芯片助力支付寶NFC支付
歡創(chuàng)播報(bào) 支付寶“碰一下”正式發(fā)布

從多線程設(shè)計(jì)模式到對(duì) CompletableFuture 的應(yīng)用

智能手機(jī)移動(dòng)支付功能的最佳選擇TG2520SMN溫補(bǔ)晶振X1G005421030427

利用Swap模式實(shí)現(xiàn)代碼回滾操作

評(píng)論