多线程事务怎么回滚

1、背景介绍

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

2、公用的类和方法

/**
 * 平均拆分list方法.
 * @param source
 * @param n
 * @param <T>
 * @return
 */
public static <T> List<List<T>> averageAssign(List<T> source,int n){
    List<List<T>> result=new ArrayList<List<T>>();
    int remaider=source.size()%n; 
    int number=source.size()/n; 
    int offset=0;//偏移量
    for(int i=0;i<n;i++){
        List<T> value=null;
        if(remaider>0){
            value=source.subList(i*number+offset, (i+1)*number+offset+1);
            remaider--;
            offset++;
        }else{
            value=source.subList(i*number+offset, (i+1)*number+offset);
        }
        result.add(value);
    }
    return result;
}
/**  线程池配置
 * @version V1.0
 */
public class ExecutorConfig {
    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;
    public static ExecutorService getThreadPool() {
        if (executorService == null){
            synchronized (ExecutorConfig.class){
                if (executorService == null){
                    executorService =  newThreadPool();
                }
            }
        }
        return executorService;
    }

    private static  ExecutorService newThreadPool(){
        int queueSize = 500;
        int corePool = Math.min(5, maxPoolSize);
        return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
    }
    private ExecutorConfig(){}
}
/** 获取sqlSession
 * @author 86182
 * @version V1.0
 */
@Component
public class SqlContext {
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;

    public SqlSession getSqlSession(){
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}

3、示例事务不成功操作

  /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {
    try {
        //先做删除操作,如果子线程出现异常,此操作不会回滚
        this.getBaseMapper().delete(null);
        //获取线程池
        ExecutorService service = ExecutorConfig.getThreadPool();
        //拆分数据,拆分5份
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        //执行的线程
        Thread []threadArray = new Thread[lists.size()];
        //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            threadArray[i] =  new Thread(() -> {
                try {
                 //最后一个线程抛出异常
                    if (!atomicBoolean.get()){
                        throw new ServiceException("001","出现异常");
                    }
                    //批量添加,mybatisPlus中自带的batch方法
                    this.saveBatch(list);
                }finally {
                    countDownLatch.countDown();
                }

            });
        }
        for (int i = 0; i <lists.size(); i++){
            service.execute(threadArray[i]);
        }
        //当子线程执行完毕时,主线程再往下执行
        countDownLatch.await();
        System.out.println("添加完毕");
    }catch (Exception e){
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}

数据库中存在一条数据

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {

    @Resource
    private EmployeeBO employeeBO;

    /**
     *   测试多线程事务.
     * @throws InterruptedException
     */
    @Test
    public  void MoreThreadTest2() throws InterruptedException {
        int size = 10;
        List<EmployeeDO> employeeDOList = new ArrayList<>(size);
        for (int i = 0; i<size;i++){
            EmployeeDO employeeDO = new EmployeeDO();
            employeeDO.setEmployeeName("lol"+i);
            employeeDO.setAge(18);
            employeeDO.setGender(1);
            employeeDO.setIdNumber(i+"XX");
            employeeDO.setCreatTime(Calendar.getInstance().getTime());
            employeeDOList.add(employeeDO);
        }
        try {
            employeeBO.saveThread(employeeDOList);
            System.out.println("添加成功");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

测试结果:
在这里插入图片描述
可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

4、使用sqlSession控制手动提交事务

 @Resource
  SqlContext sqlContext;
 /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        //获取mapper
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        //获取执行器
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        //拆分list
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            //使用返回结果的callable去执行,
            Callable<Integer> callable = () -> {
                //让最后一个线程抛出异常
                if (!atomicBoolean.get()){
                    throw new ServiceException("001","出现异常");
                }
              return employeeMapper.saveBatch(list);
            };
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
        //如果有一个执行不成功,则全部回滚
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}
// sql
<insert id="saveBatch" parameterType="List">
 INSERT INTO
 employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
 values
     <foreach collection="list" item="item" index="index" separator=",">
     (
     #{item.employeeId},
     #{item.age},
     #{item.employeeName},
     #{item.birthDate},
     #{item.gender},
     #{item.idNumber},
     #{item.creatTime},
     #{item.updateTime},
     #{item.status}
         )
     </foreach>
 </insert>

数据库中一条数据

测试结果:抛出异常,
在这里插入图片描述
删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。
成功操作示例:

@Resource
SqlContext sqlContext;
/**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        for (int i =0;i<lists.size();i++){
            List<EmployeeDO> list  = lists.get(i);
            Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
       // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
    }
}

测试结果:
在这里插入图片描述
数据库中数据:

删除的删除了,添加的添加成功了,测试成功。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/586773.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【算法小白周赛1A】分析 - 题解与代码

题目链接&#xff1a;https://www.starrycoding.com/problem/155 题目描述 小可可最近在学数学运算&#xff01;他希望考考你&#xff0c;给你两个整数 A , B A,B A,B&#xff0c;询问 A B A\times B AB 是否是偶数。 注意&#xff0c;可能存在前导 0 0 0&#xff0c;比如…

面试题-Redis篇

什么是 Redis? Redis 是完全开源免费的&#xff0c;遵守 BSD 协议&#xff0c;是一个高性能的 key-value 数据库。 Redis 与其他 key - value 缓存产品有以下三个特点&#xff1a; Redis 支持数据的持久化&#xff0c;可以将内存中的数据保存在磁盘中&#xff0c;重启的时 …

【C语言】指针篇-精通库中的快速排序算法:巧妙掌握技巧(4/5)

&#x1f308;个人主页&#xff1a;是店小二呀 &#x1f308;C语言笔记专栏&#xff1a;C语言笔记 &#x1f308;C笔记专栏&#xff1a; C笔记 &#x1f308;喜欢的诗句:无人扶我青云志 我自踏雪至山巅 文章目录 一、回调函数二、快速排序(Qsort)2.1 Qsort参数部分介绍2.2 不…

【Flutter】极光推送配置流程(小米厂商通道) 章二

前言 继【Flutter】极光推送配置流程(极光通道/华为厂商/IOS) 章一 并且&#xff0c;我大概率不会去修改第一篇文章的内容。 随着我自己在配置公司的项目的同时&#xff0c;我希望一直更新这个推送系列文章。 在章一配置完后&#xff0c;也是出现了一些问题&#xff0c;所以本…

【第1章】spring-mvc搭建

文章目录 前言一、准备二、搭建1.搭建2.项目结构 三、第一个Servlet程序1. jsp2. servlet3. 启动 总结 前言 Java已经进入了飞速发展的阶段&#xff0c;spring-mvc也发生了巨大的变化&#xff0c;最让人无法忍受的就是javax.servlet.* 变成了jakarta.servlet.* ps:虽然使用起来…

GPT-ArcGIS数据处理、空间分析、可视化及多案例综合应用

在数字化和智能化的浪潮中&#xff0c;GIS&#xff08;地理信息系统&#xff09;和GPT&#xff08;生成式预训练模型&#xff09;的结合正日益成为推动科研、城市规划、环境监测等领域发展的关键技术。GIS以其强大的空间数据处理、先进的空间分析工具、灵活的地图制作与可视化能…

吴恩达2022机器学习专项课程(一)7.2 逻辑回归的简化成本函数课后实验 Lab5

问题预览/关键词 二分类问题的训练集&#xff08;多特征&#xff09;绘制训练集数据的散点图自定义plot_data() Python实现逻辑回归的成本函数自定义sigmoid() 调用成本函数不同的w&#xff0c;b&#xff0c;绘制逻辑回归模型的决策边界验证哪条决策边界效果好总结 二分类问题的…

精通GDBus:Linux IPC的现代C接口

目录标题 1. GDBus介绍2. GDBus的优点3. 安装GDBus4. 使用GDBus连接到D-Bus总线实现D-Bus服务调用D-Bus方法发送和接收信号 5. 总结 在Linux环境下&#xff0c;不同的程序需要通过某种方式进行通信和协同工作。GDBus是GLib库的一部分&#xff0c;提供了一个基于GObject系统的、…

中科驭数受邀成为移动云智能芯片开放实验室首批成员企业

4月28日至29日&#xff0c;2024中国移动算力网络大会在苏州举行。大会以“算力网络点亮AI新时代”为主题&#xff0c;全面展示了中国移动最新算力网络成果与能力。中科驭数作为移动云智能芯片开放实验室首批合作伙伴&#xff0c;受邀参加入驻仪式&#xff0c;中科驭数高级副总裁…

OpenCV的图像矩(64)

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇&#xff1a;OpenCV如何为等值线创建边界旋转框和椭圆(63) 下一篇 :OpenCV系列文章目录&#xff08;持续更新中......&#xff09; Image Moments&#xff08;图像矩&#xff09;是 OpenCV 库中的一个…

C语言----函数

1.函数的概念 函数&#xff1a;founction c语言的程序代码都是函数组成的 c语言中的函数就是一个完成某项特定的任务的一段代码&#xff0c;这段代码有特殊的写法和调用方法 c语言中我们一般见到两种函数&#xff1a; .库函数 .自定义函数 2.库函数 有对应的头文件 #i…

Python | Leetcode Python题解之第60题排列序列

题目&#xff1a; 题解&#xff1a; class Solution:def getPermutation(self, n: int, k: int) -> str:factorial [1]for i in range(1, n):factorial.append(factorial[-1] * i)k - 1ans list()valid [1] * (n 1)for i in range(1, n 1):order k // factorial[n - …

大数据分析与内存计算学习笔记

一、Scala编程初级实践 1.计算级数&#xff1a; 请用脚本的方式编程计算并输出下列级数的前n项之和Sn&#xff0c;直到Sn刚好大于或等于q为止&#xff0c;其中q为大于0的整数&#xff0c;其值通过键盘输入。&#xff08;不使用脚本执行方式可写Java代码转换成Scala代码执行&a…

【面试经典 150 | 回溯】单词搜索

文章目录 写在前面Tag题目来源解题思路方法一&#xff1a;回溯 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并附带一些对于本题涉及到的数据结构等内容进行回顾…

C语言实验-循环结构和选择结构

一&#xff1a; 求和:1(14)(149)(14916)…(14916…n2)? 其中n的值由键盘输入&#xff1b; #define _CRT_SECURE_NO_WARNINGS #include<stdio.h>int main() {int sum 0;int n 0;printf("请输入一个整数");scanf("%d", &n);for (int i 0; i &l…

【记录】Python3| 将 PDF 转换成 HTML/XML(✅⭐pdfminer.six)

本文将会被汇总至 【记录】Python3&#xff5c;2024年 PDF 转 XML 或 HTML 的第三方库的使用方式、测评过程以及对比结果&#xff08;汇总&#xff09;&#xff0c;更多其他工具请访问该文章查看。 注意&#xff01;pdfminer.six 和 pdfminer3k 不是同一个&#xff01;&#xf…

Java 写一个死锁的例子

public class DeadLock {public static void main(String[] args) {Object lock1 new Object();Object lock2 new Object();new Thread(new A(lock1,lock2),"线程A").start();new Thread(new B(lock1,lock2),"线程B").start();} }class A implements Run…

JAVAEE—servlet的概念及使用,使用servlet接口实现一个表白墙

文章目录 servlet的概念静态页面和动态页面servlet的作用 写出一个servlet程序目录的创建设置smart tomcat编写helloworld servlet的概念 首先我们要搞明白什么是servlet&#xff0c;servlet是一种实现动态页面的技术&#xff0c;他是由tomcat提供给程序员的一组API可以帮助程…

新版多功能在线生成收款码系统源码

相信大家已经听说过收款码三合一这个概念&#xff0c;并且在很多场景中都看到过商家开始使用这样的收款码。前台放置着一个二维码&#xff0c;上边写着“支付宝、微信、QQ扫码付款”&#xff0c;不管使用哪个软件扫码&#xff0c;都能正确识别。但是&#xff0c;我们平台发现使…

Linux 的静态库和动态库

本文目录 一、静态库1. 创建静态库2. 静态库的使用 二、动态库1. 为什么要引入动态库呢&#xff1f;2. 创建动态库3. 动态库的使用4. 查看可执行文件依赖的动态库 一、静态库 在编译程序的链接阶段&#xff0c;会将源码汇编生成的目标文件.o与引用到的库&#xff08;包括静态库…
最新文章