项目中用到了多线程去批量处理一些数据,当时想当然认为只要方法上加上@Transactional注解就好了,实际并未达到想要的处理效果。特此去学习了下关于多线程事务回滚相关方案,参考了网上其他资料,这里整理并记录下学习历程。
站在巨人的肩膀上,我们可以看的更远!
多线程事务怎么回滚?
- 一、准备相关基础方法
- 1.线程池配置
- 2.list切分工具类
- 3.SqlSession工具类
- 4.员工实体类
- 5.员工EmployeeMapper
- 6.员工对应EmployeeMapper.xml
- 二、业务处理
- 1.EmployeeService接口
- 2.测试多线程事务实现类
- 3.员工Controller
- 三、方案验证
- 1.数据库表Employee存储1条原始数据,用于验证数据删除后是否被回滚。![在这里插入图片描述](https://img-blog.csdnimg.cn/cb341ba2f3e146e69dc3e913f1b411f8.png)
- 2.EmployeeServiceImpl的saveThreadByTransactional方法
- 3.EmployeeServiceImpl的saveThreadRollBack方法
- 四、方案总结
- 1.方案总结
- 五.项目结构及下载
一、准备相关基础方法
这里以多线程、分批次插入数据库employee表为例子进行演示。
1.线程池配置
/**
* 线程池配置
*/
@Component
public class ExecutorConfig {
private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
private volatile static ExecutorService executorService;
public static ExecutorService getThreadPool() {
if (executorService == null){
synchronized (ExecutorConfig.class){
if (executorService == null){
executorService = newThreadPool();
}
}
}
return executorService;
}
private static ExecutorService newThreadPool(){
int queueSize = 1000;
int corePool = Math.min(10, maxPoolSize);
return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
}
private ExecutorConfig(){}
}
2.list切分工具类
/**
* list切分工具类
*/
public class ListUtil {
/**
* 平均拆分list
*
* @param source
* @param n
* @param <T>
* @return
*/
public static <T> List<List<T>> AverageList(List<T> source, int n) {
List<List<T>> result = new ArrayList<>();
int remaider = source.size() % n;
int number = source.size() / n;
//偏移量
int offset = 0;
for (int i = 0; i < n; i++) {
List<T> value;
if (remaider > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remaider--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;
}
}
3.SqlSession工具类
/**
* SqlSession工具类
*/
@Component
public class SqlContext {
@Resource
private SqlSessionTemplate sqlSessionTemplate;
public SqlSession getSqlSession(){
SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
return sqlSessionFactory.openSession();
}
}
4.员工实体类
/**
* 员工
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "employee")
public class Employee {
@TableField(value = "employee_id")
private Integer employeeId;
@TableField(value = "employee_name")
private String employeeName;
@TableField(value = "age")
private Integer age;
}
5.员工EmployeeMapper
@Repository
public interface EmployeeMapper extends BaseMapper<Employee> {
int saveBatchRollBack(List Employee);
}
6.员工对应EmployeeMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.it.mapper.EmployeeMapper">
<resultMap id="BaseResultMap" type="com.it.entity.Employee">
<!--@Table `Employee`-->
<result column="employee_id" jdbcType="INTEGER" property="employee_id" />
<result column="employee_name" jdbcType="VARCHAR" property="employee_name" />
<result column="age" jdbcType="INTEGER" property="age" />
</resultMap>
<sql id="Base_Column_List">
employee_id, employee_name, age
</sql>
<insert id="saveBatchRollBack">
insert into
employee (employee_id,age,employee_name)
values
<foreach collection="list" item="item" index="index" separator=",">
(
#{item.employeeId},
#{item.age},
#{item.employeeName}
)
</foreach>
</insert>
</mapper>
二、业务处理
1.EmployeeService接口
public interface EmployeeService extends IService<Employee> {
/**
* 使用@Transactional测试多线程回滚失败
*/
void saveThreadByTransactional(List<Employee> employeeList);
/**
* 使用手动操作事务测试多线程回滚成功
*/
void saveThreadRollBack(List<Employee> employeeList) throws SQLException;
}
2.测试多线程事务实现类
/**
* 测试多线程事务
*/
@Service
@Slf4j
public class EmployeeServiceImpl extends ServiceImpl<EmployeeMapper, Employee> implements EmployeeService {
@Resource
SqlContext sqlContext;
/**
* 多线程环境下Transactional失效场景
*
* @param employeeList
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void saveThreadByTransactional(List<Employee> employeeList) {
try {
// 先做删除操作,如果子线程出现异常,此操作不会回滚
this.getBaseMapper().delete(null);
// 获取线程池
ExecutorService executorService = ExecutorConfig.getThreadPool();
// 拆分数据,拆分6份
List<List<Employee>> lists = ListUtil.AverageList(employeeList, 6);
// 执行的线程
Thread[] threadArray = new Thread[lists.size()];
// 监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
for (int i = 0; i < lists.size(); i++) {
if (i == lists.size() - 1) {
// 最后一个atomicBoolean设置为false
atomicBoolean.set(false);
}
List<Employee> list = lists.get(i);
threadArray[i] = new Thread(() -> {
try {
// 最后一个线程抛出异常
if (!atomicBoolean.get()) {
throw new RuntimeException("最后一个线程添加时抛出异常");
}
//批量添加,mybatisPlus中自带的batch方法
this.saveBatch(list);
} finally {
countDownLatch.countDown();
}
});
}
for (int i = 0; i < lists.size(); i++) {
executorService.execute(threadArray[i]);
}
// 当子线程执行完毕时,主线程再往下执行
countDownLatch.await();
System.out.println("employee列表添加完成");
} catch (Exception e) {
log.info("error", e);
throw new RuntimeException("employee列表添加过程出现异常");
}
}
/**
* 使用sqlSession控制手动提交事务
*
* @param employeeList
*/
@Override
public void saveThreadRollBack(List<Employee> employeeList) throws SQLException {
{
// 获取数据库连接,获取会话(内部自有事务)
SqlSession sqlSession = sqlContext.getSqlSession();
Connection connection = sqlSession.getConnection();
try {
// 设置手动提交
connection.setAutoCommit(false);
//获取mapper
EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
//先做删除操作
employeeMapper.delete(null);
//获取执行器
ExecutorService service = ExecutorConfig.getThreadPool();
List<Callable<Integer>> callableList = new ArrayList<>();
//拆分list
List<List<Employee>> lists = ListUtil.AverageList(employeeList, 6);
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
for (int i = 0; i < lists.size(); i++) {
if (i == lists.size() - 1) {
atomicBoolean.set(false);
}
List<Employee> list = lists.get(i);
//使用返回结果的callable去执行,
Callable<Integer> callable = () -> {
//让最后一个线程抛出异常
if (!atomicBoolean.get()) {
throw new Exception("出现异常");
}
return employeeMapper.saveBatchRollBack(list);
};
callableList.add(callable);
}
//执行子线程
List<Future<Integer>> futures = service.invokeAll(callableList);
for (Future<Integer> future : futures) {
//如果有一个执行不成功,则全部回滚
if (future.get() <= 0) {
connection.rollback();
return;
}
}
connection.commit();
System.out.println("添加完毕");
} catch (Exception e) {
connection.rollback();
log.info("error", e);
} finally {
connection.close();
}
}
}
3.员工Controller
@RestController
@RequestMapping(value = "/employee")
public class EmployeeController {
@Autowired
EmployeeService employeeService;
@PostMapping("/saveThreadByTransactional")
public ResponseEntity saveThreadByTransactional() {
// 模拟需要插入12名员工到数据库
List<Employee> list = IntStream.range(0, 12)
.mapToObj(i -> {
Employee employee = new Employee();
employee.setEmployeeId(i);
employee.setEmployeeName("三丰" + i);
employee.setAge(i + 100);
return employee;
})
.collect(Collectors.toList());
employeeService.saveThreadByTransactional(list);
return new ResponseEntity<>(HttpStatus.OK);
}
@PostMapping("/saveThreadRollBack")
public ResponseEntity saveThreadRollBack() throws SQLException {
// 模拟需要插入12名员工到数据库
List<Employee> list = IntStream.range(0, 12)
.mapToObj(i -> {
Employee employee = new Employee();
employee.setEmployeeId(i);
employee.setEmployeeName("三丰" + i);
employee.setAge(i + 100);
return employee;
})
.collect(Collectors.toList());
employeeService.saveThreadRollBack(list);
return new ResponseEntity<>(HttpStatus.OK);
}
}
三、方案验证
1.数据库表Employee存储1条原始数据,用于验证数据删除后是否被回滚。
2.EmployeeServiceImpl的saveThreadByTransactional方法
该方法通过使用@Transactional注解尝试处理多线程事务回滚。
利用postman测试saveThreadByTransactional接口
发现控制台显示我们自定义的线程报错
查询数据库Employee表,发现代码中this.getBaseMapper().delete(null);
可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚(数据库中表数据也已经被删除完成),则证明@Transactional注解并不能在多线程下进行事务回滚!
3.EmployeeServiceImpl的saveThreadRollBack方法
该方法通过使用sqlSession控制,手动提交事务,在多线程下进行事务回滚。
利用postman测试saveThreadRollBack接口。
发现控制台显示我们自定义的线程报错。
查询数据库Employee表,发现数据并未被删除,证明多线程执行过程中失败了,事务被回滚了。
四、方案总结
1.方案总结
在Spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效。
如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
通过使用sqlSession控制手动提交事务,可以达到主线程和子线程数据事务回滚。
五.项目结构及下载
源码地址springboot-cacheable,创作不易,欢迎star哦~
参考资料
支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!
多线程事务怎么回滚?
多线程如何实现事务回滚?一招帮你搞定!