网站开发工具百度推广代理加盟
一、问题复现
批量插入时,使用多线程对插入数据实现分批插入,在service层使用@Transactional注解,对应方法中线程池中开辟的子线程抛出异常时,没有回滚事务。
二、原因分析
事务管理范围不正确:@Transactional注解仅对当前方法有效,如果在方法内创建新的线程或使用线程池等异步操作,该方法之外的代码将无法受到事务的管理。因此,在使用多线程进行批量操作时,需要确保整个批量操作处于同一事务管理范围内。
Spring事务和Java线程池机制的互动问题:在使用ThreadPoolExecutor进行批量操作时,线程池中的线程和Spring管理的事务并不是同一个线程,这可能会导致事务管理器感知不到线程中的异常,从而导致事务未能回滚。
三、解决办法
弃用注解样事务,改为手动管理事务。
复制代码
SqlSession sqlSession = SpringContextUtils.getBean(SqlSessionTemplate.class).getSqlSessionFactory() .openSession();Connection connection = sqlSession.getConnection();OfflineExpressRecordExtMapper extMapper = sqlSession.getMapper(OfflineExpressRecordExtMapper.class);// 批量插入int taskCount = (int) Math.ceil((double) beanList.size() / THREAD_HANDLE);ThreadPoolExecutor executor = SpringContextUtils .getBean("offlineExpressRecordThreadPoolExecutor", ThreadPoolExecutor.class);
try {
connection.setAutoCommit(false);
ArrayList<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
int start = i * THREAD_HANDLE;
int end = (i + 1) * THREAD_HANDLE > beanList.size() ? beanList.size() : (i + 1) * THREAD_HANDLE;
List<OfflineExpressRecord> threadHandleList = beanList.subList(start, end);
Future<?> task = executor.submit(() -> extMapper.saveBatch(threadHandleList));
futures.add(task);
}// 等待插入完成,检验异常
for (Future<?> future : futures) {
future.get();
}connection.commit();} catch (Exception e) {log.error("批量导入存储数据过程中出现异常", e);connection.rollback();throw e;
} finally {connection.close();}