一、业务场景
我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。
二、解决方案
我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。
1、工具类代码:
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author poxiao
* @create 2023-01-05 22:22
* <p>
* 多线程事务管理器
* 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议
* 解决基于线程池的多线程事务一致性问题
*/
@Slf4j
public class MultiThreadingTransactionManager {
/**
* 事务管理器
*/
private final PlatformTransactionManager transactionManager;
/**
* 超时时间
*/
private final long timeout;
/**
* 时间单位
*/
private final TimeUnit unit;
/**
* 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0
*/
private CountDownLatch oneStageLatch = null;
/**
* 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚
*/
private final CountDownLatch twoStageLatch = new CountDownLatch(1);
/**
* 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)
*/
private final AtomicBoolean isSubmit = new AtomicBoolean(true);
/**
* 构造方法
* @param transactionManager 事务管理器
* @param timeout 超时时间
* @param unit 时间单位
*/
public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {
this.transactionManager = transactionManager;
this.timeout = timeout;
this.unit = unit;
}
/**
* 线程池方式执行任务,可保证线程间的事务一致性
* @param runnableList 任务列表
* @param executor 线程池
* @return
*/
public boolean execute(List<Runnable> runnableList, Executor executor) {
// 排除null值
runnableList.removeAll(Collections.singleton(null));
// 属性初始化
innit(runnableList.size());
// 遍历任务列表并放入线程池
for (Runnable runnable : runnableList) {
// 创建线程
Thread thread = new Thread() {
@Override
public void run() {
// 如果别的线程执行失败,则该任务就不需要再执行了
if (!isSubmit.get()) {
log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");
oneStageLatch.countDown();
return;
}
// 开启事务
TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
// 执行业务逻辑
runnable.run();
} catch (Exception e) {
// 执行体发生异常,设置回滚
isSubmit.set(false);
log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);
}
// 计数器减一
oneStageLatch.countDown();
try {
//等待所有线程任务完成,监控是否有异常,有则统一回滚
twoStageLatch.await();
// 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时
if (isSubmit.get()) {
// 提交
transactionManager.commit(transactionStatus);
log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);
} else {
// 回滚
transactionManager.rollback(transactionStatus);
log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executor.execute(thread);
}
/**
* 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0
* 主线程发起第二阶段,执行阶段(提交或回滚),根据
*/
try {
// 主线程等待所有线程执行完成,超时时间设置为五秒
oneStageLatch.await(timeout, unit);
long count = oneStageLatch.getCount();
System.out.println("countDownLatch值:" + count);
// 主线程等待超时,子线程可能发生长时间阻塞,死锁
if (count > 0) {
// 设置为回滚
isSubmit.set(false);
log.info("主线线程等待超时,任务即将全部回滚");
}
twoStageLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败
return isSubmit.get();
}
/**
* 初始化属性
* @param size 任务数量
*/
private void innit(int size) {
oneStageLatch = new CountDownLatch(size);
}
}
2、业务代码:
(1)线程池参数
我这里采用自定义线程池,线程池参数如下:
@Configuration
public class ThreadPoolConfig {
// 获取服务器的cpu个数
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数
private static final int COUR_SIZE = CPU_COUNT * 4;
private static final int MAX_COUR_SIZE = CPU_COUNT * 8;
// 接下来配置一个bean,配置线程池。
@Bean
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数
threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数
threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)
threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略
return threadPoolTaskExecutor;
}
}
(2)任务业务正常,无异常抛出时正常提交事务情况
public Result<?> testTransaction() throws SQLException {
List<User> users = new LinkedList<>();
User user = new User();
user.setName("1111");
users.add(user);
User user1 = new User();
user1.setName("2222");
users.add(user1);
MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);
List<Runnable> runnableList = new ArrayList<>();
users.forEach((x) -> {
runnableList.add(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);
secondUserMapper.insertUser(x);
}
});
});
multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);
return Result.success(1);
}
执行时的日志:
执行成功后数据库多次了两条数据
(3)展示出现异常任务时回滚事务情况
public Result<?> testTransaction() throws SQLException {
List<User> users = new LinkedList<>();
User user = new User();
user.setName("1111");
users.add(user);
User user1 = new User();
user1.setName("2222");
users.add(user1);
MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);
List<Runnable> runnableList = new ArrayList<>();
//模拟任务出现异常
runnableList.add(() -> {
int a = 10 / 0;
});
users.forEach((x) -> {
runnableList.add(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);
secondUserMapper.insertUser(x);
}
});
});
multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);
return Result.success(1);
}
执行时的日志:
数据库没有新增的数据
参考文章:
Spring多线程事务解决方案-CSDN博客
两阶段VS三阶段提交协议_两阶段提交-CSDN博客
详解Spring多线程下如何保证事务的一致性-51CTO.COM
多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客