场景 :在普通的开发的时候,任务是单线程处理的,这这时候性能可能有点慢。基于 juc 包下的ThreadPoolExecutor 进行开发,可以转换成为批处理的,使性能成倍提高
出现主要的问题 :将任务切割成为子任务的时候,事务统一性被破坏。
springboot:2.2.0.RELEASE
flowable:6.4.2
分析步骤:
Step1. ThreadPoolExecutor 的基本用法,编写通用工具类
Step2. 基于面向接口开发,进行通用抽象
Step3. 分析spring事务,将基于注解的声明式事务,改为编程式事务
Step4. 使用 变量表示来决定是否使用统一事务Step1: ThreadPoolExecutor 简单用法
基本处理代码
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("flow-pool-%d") .build(); int corePoolSize = 10; int maximumPoolSize = 10; long keepAliveTime = 3; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); ExecutorService executorService = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, namedThreadFactory) { @Override protected void beforeExecute(Thread t, Runnable r) { // 线程处理前置方法 @Override protected void afterExecute(Runnable r, Throwable t) { // 线程处理后置方法 List<Future<?>> futures = new ArrayList<>(); for (int i = 0; i < group; i++) { int startIndex = i * groupSize; int endIndex = (i + 1) * groupSize; if (endIndex > toDoList.size()) { endIndex = toDoList.size(); List<?> items = toDoList.subList(startIndex, endIndex); futures.add(executorService.submit(new SingleTransactionPoolTask(execution, items, flag))); try { for (Future<?> future : futures) { future.get(); } catch (Exception e) { e.printStackTrace(); // 业务操作 } finally { executorService.shutdown();在进行主线程拆分成多子线程并发处理的时候,经常会遇到部分主线程的数据无法在子线程获取到,此时就可以通过重写线程池 beforeExecute() 方法,将主线程数据同步到子线程中。如:工作流的 Authentication.setAuthenticatedUserId(currentUserId);
基于ThreadLocal 的全局变量设置线程池调用任务
此处为线程池实际处理方法,ExecutionService.submit(Runnable task);
* Submits a Runnable task for execution and returns a Future * representing that task. The Future's {@code get} method will * return {@code null} upon <em>successful</em> completion. * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null Future<?> submit(Runnable task);SingleTransactionPoolTask 实现 runnable 接口
public class SingleTransactionPoolTask implements Runnable { private final ThreadExecution threadExecution; private final List<?> list; private final BatchTransactionFlag flag; public SingleTransactionPoolTask(ThreadExecution threadExecution, List<?> list, BatchTransactionFlag flag) { this.threadExecution = threadExecution; this.list = list; this.flag = flag; @Override public void run() { try { threadExecution.threadExecute(list); } finally { flag.getCompleteThreads().incrementAndGet();返回线程调用的 处理方法
主要进行子线程中是否有异常,如果具有异常则应该进行的对应业务处理try { for (Future<?> future : futures) { future.get(); } catch (Exception e) { e.printStackTrace(); // 业务操作 } finally { executorService.shutdown();Step2: 基于面向接口开发,将业务操作进行多态
ThreadExecution 抽象子任务接口,具体不同业务编写指定的实现类,形成多态。通用工具类统一调用接口
public interface ThreadExecution { * 处理线程任务 * @param list void threadExecute(List<?> list);SingleTransactionPoolTask 通用任务实现类,基于 依赖倒置原则 调用 ThreadExecution
public class SingleTransactionPoolTask implements Runnable { private final ThreadExecution threadExecution; private final List<?> list; private final BatchTransactionFlag flag; public SingleTransactionPoolTask(ThreadExecution threadExecution, List<?> list, BatchTransactionFlag flag) { this.threadExecution = threadExecution; this.list = list; this.flag = flag; @Override public void run() { try { threadExecution.threadExecute(list); } finally { flag.getCompleteThreads().incrementAndGet();实现 ThreadExecution 接口,进行业务多态
BatchStartProcessThreadExecutionImpl@Slf4j public class BatchStartProcessThreadExecutionImpl implements ThreadExecution { private RuntimeService runtimeService; private List<BatchStartProcessInstanceRsp.ProcessInstanceItem> records; public BatchStartProcessThreadExecutionImpl(List<BatchStartProcessInstanceRsp.ProcessInstanceItem> records) { this.records = records; this.runtimeService = SpringContextUtils.getBean(RuntimeService.class); @Override public void threadExecute(List list) { // 省略业务代码BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl
@Slf4j public class BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl implements ThreadExecution { private List<BatchCompleteTaskRsp.CompleteTaskItem> result; private FlowTaskService flowTaskService; public BatchTaskCompleteTaskWithBatchTransactionThreadExecutionImpl(List<BatchCompleteTaskRsp.CompleteTaskItem> result) { this.result = result; this.flowTaskService = SpringContextUtils.getBean(FlowTaskService.class); @Override public void threadExecute(List list) { // 省略业务代码Step3. 分析spring事务,将基于注解的声明式事务,改为编程式事务
在进行spring开发的时候,基本都是基于spring的声明式事务(@Transactional)进行开发,可以做到非常高效。但是基于多线程开发的时候,通过debug,可以发现,主线程还没有进行异常处理环节,子线程事务已经提交,并且在数据库已经可以查询到。 这个并不满足于业务需求。(如图)
基于对Spring事务bean之间关系的了解,事务都是围绕着 TransactionManager,实现类为:org.springframework.jdbc.datasource.DataSourceTransactionManager,可以找到接口org.springframework.transaction.PlatformTransactionManager,并且该接口具有如下的方法
PlatformTransactionManager接口的方法
所以获取事务的代码则为
// 获取事务 TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults()); // 提交事务 transactionManager.commit(transactionStatus); // 回滚事务 transactionManager.rollback(transactionStatus);所以原本计划是:根据传入参数,把事务从子线程中获取,再回到主线程中提交。不过事与愿违的是,提交事务并没有想象中那么直接。会抛出异常
DefaultCommonThreadExecutionServiceBean
@Service public class DefaultCommonThreadExecutionServiceBean implements CommonThreadExecutionService { @Resource private DataSourceTransactionManager transactionManager; @Override // @Transactional(rollbackFor = Exception.class) public int executeBatch(ThreadExecution threadExecution, List<?> sequence, List<TransactionStatus> transactionStatusList) { TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults()); transactionStatusList.add(transactionStatus); threadExecution.threadExecute(sequence); return 0;FlowThreadPoolExecutor 代码段
DataSourceTransactionManager transactionManager = SpringContextUtils.getBean(DataSourceTransactionManager.class); try { for (Future future : futures) { future.get(); transactionStatusList.forEach(obj -> { transactionManager.commit(obj); } catch (Exception e) { e.printStackTrace(); transactionStatusList.forEach(obj -> { transactionManager.rollback(obj); } finally { executorService.shutdown();根据spring事务源码分析可知,spring的事务也是基于ThreadLocal的,所以出现了跨越线程的时候,就会出现无法执行完成。并且由navicat无法操作数据库可以看出,数据库事务并未提交,出现了行锁。
org.springframework.transaction.support.TransactionSynchronizationManager#unbindResource
* Unbind a resource for the given key from the current thread. * @param key the key to unbind (usually the resource factory) * @return the previously bound value (usually the active resource object) * @throws IllegalStateException if there is no value bound to the thread * @see ResourceTransactionManager#getResourceFactory() public static Object unbindResource(Object key) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null) { throw new IllegalStateException( "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); return value;org.springframework.transaction.support.TransactionSynchronizationManager
虽然源码中可以获取看到事务提交代码是 数据库连接的提交,但是其中还是必须执行 清除当前线程绑定的事务,才能彻底释放数据库连接。
提交事务:org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
@Override protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); try { con.commit(); catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex);抛出异常:org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion
* Clean up after completion, clearing synchronization if necessary, * and invoking doCleanupAfterCompletion. * @param status object representing the transaction * @see #doCleanupAfterCompletion private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); Object transaction = (status.hasTransaction() ? status.getTransaction() : null); resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());所以最终解决还是需要在子线程进行提交,此时,又可以使用线程池的重写 java.util.concurrent.ThreadPoolExecutor#afterExecute
并且通过变量来确定子线程是否已经执行完成,如果执行完成,才进行事务的提交BatchTransactionFlag
@Getter public class BatchTransactionFlag { private final AtomicInteger completeThreads = new AtomicInteger(); private final AtomicInteger successThreads = new AtomicInteger(); private final int groupSize; private boolean batchTransaction; private Map<Long, TransactionStatus> longTransactionStatusMap; private final List<?> toDoList; public BatchTransactionFlag(int groupSize, boolean batchTransaction, List<?> toDoList) { this.groupSize = groupSize; this.batchTransaction = batchTransaction; this.toDoList = toDoList; if (batchTransaction) { longTransactionStatusMap = new ConcurrentHashMap<>();CommonThreadExecutionService实现
@Slf4j @Service public class DefaultCommonThreadExecutionServiceBean implements CommonThreadExecutionService { @Resource private DataSourceTransactionManager transactionManager; @Override public int executeBatch(ThreadExecution threadExecution, List sequence, Map<Long, TransactionStatus> longTransactionStatusMap, BatchTransactionFlag flag) { synchronized (flag) { TransactionStatus transactionStatus = transactionManager.getTransaction(TransactionDefinition.withDefaults()); longTransactionStatusMap.put(Thread.currentThread().getId(), transactionStatus); try { threadExecution.threadExecute(sequence); flag.getSuccessThreads().incrementAndGet(); } finally { flag.getCompleteThreads().incrementAndGet(); log.info("完成任务:" + Thread.currentThread().getName()); return 0;经过测试发现,需要调用数据库修改的步骤,还是需要同步块的,不使用会导致数据库死锁,导致处理超时
Step4. 使用 变量表示来决定是否使用统一事务
从上面可以看到由于面向接口进行处理,所以根据需要 统一事务 跟 不需要统一事务 又可以使用不同实现类来进行控制,并且在编写线程池的时候也配合做判断。
线程池执行的代码
for (int i = 0; i < group; i++) { int startIndex = i * groupSize; int endIndex = (i + 1) * groupSize; if (endIndex > toDoList.size()) { endIndex = toDoList.size(); List<?> items = toDoList.subList(startIndex, endIndex); if (batchTransaction) { futures.add(executorService.submit(new BatchTransactionPoolTask(execution, items, flag.getLongTransactionStatusMap(), flag))); } else { futures.add(executorService.submit(new SingleTransactionPoolTask(execution, items, flag)));线程池的构建
private static ThreadPoolExecutor createThreadPoolExecutorInstance(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, BatchTransactionFlag flag ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("flow-pool-%d") .build(); String currentUserId = SecurityUtils.getCurrentUserId(); DataSourceTransactionManager transactionManager = SpringContextUtils.getBean(DataSourceTransactionManager.class); return new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, namedThreadFactory) { @Override protected void beforeExecute(Thread t, Runnable r) { Authentication.setAuthenticatedUserId(currentUserId); @Override protected void afterExecute(Runnable r, Throwable t) { if (flag.isBatchTransaction()) { try { while (flag.getCompleteThreads().get() != flag.getGroupSize()) { log.info(Thread.currentThread().getName() + " 等待主线程:getGroupSize:" + flag.getGroupSize() + "\tgetCompleteThreads:" + flag.getCompleteThreads().get()); log.info("开启事务个数:" + flag.getLongTransactionStatusMap().size()); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); TransactionStatus status = flag.getLongTransactionStatusMap().get(Thread.currentThread().getId()); if (flag.getSuccessThreads().get() == flag.getCompleteThreads().get()) { log.info(Thread.currentThread().getName() + ":全部执行成功,提交事务"); transactionManager.commit(status); } else { log.info(Thread.currentThread().getName() + ":具有线程执行失败,回滚事务"); transactionManager.rollback(status);