审校 | 梁策 孙淑娟
作为开发人员,我们习惯于通过在public方法上添加@Transactional 注解来实现事务管理。大多数情况下,把事务的启动、提交或者回滚全部交给Spring框架操作非常便捷,但如果认为这就是事务管理的全部,那就有失偏颇了。
Spring的确可负责事务管理的所有底层实现细节,而且不管你用的是什么持久层框架,如Hibernate、MyBatis,即便是JDBC也都提供了统一的事务模型,确保数据访问方式的变更不会影响到代码实现层面。事务管理的良好封装,一方面提升了开发效率,但同时也要注意到其降低了开发者了解底层原理的动机和意愿。扪心自问,我们真正了解在多线程环境中事务运行的机制吗?例如在一个事务里面是否可以支持多个线程同时进行数据写入?针对这个问题,网上很多论坛给出了确定的答案,但也不乏反馈@Transaction失效的声音。
究其背后的根源是Spring实现事务通过ThreadLocal把事务和当前线程进行了绑定。ThreadLocal作为本地线程变量载体,保存了当前线程的变量,并确保所有变量是线程安全的。这些封闭隔离的变量中就包含了数据库连接,Session管理的对象以及当前事务运行的其他必要信息,而开启的新线程是获取不到这些变量和对象的。不了解这些,事务内部冒然启用多线程,受限于业务场景,大多数情况下是不会有问题的,但是作为严谨的开发万不能忽视其潜在的风险。问题主要集中在两个方面:一方面导致事务失效,看似是提高了处理效率,但是一旦有异常相关数据将不会回滚,就会破坏业务的完整性。另一方面还会增加死锁的概率,无计划的并发处理,增加资源争抢的概率,其后果就是死锁,产生的异常进一步破坏业务的完整性,得不偿失。
难道就没有提升事务内处理性能的方法了?非也!虽然不能通过事务内,发起多线程处理。我们可以通过合理分块后,再启用多线程处理,通过类似分布式事务方式达到异曲同工的效果。
假设我们要并行处理一个大的对象列表,然后将它们存储到数据库中。我们先将这些对象分组,将每个块传递给不同线程分别去调用加了事务的处理方法,最后将每个线程中处理的结果收集汇总。这样通过事务的传播机制既确保了业务的完整性,也通过并行处理提升了处理效率。下面通过具体的示例,逐步演示如何实现。
第一步:定义一个负责对象处理逻辑的服务接口。
public interface ProcessingService {
List processObjects(List objectIds);
}
第二步:针对上述对象处理的接口的一个简单实现。
@Service("ProcessingDBService")
public class ProcessingDBService implements ProcessingService {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Transactional
@Override
public List processObjects(List objectIds) {
// Process and save to DB
logger.info("Running in thread " + Thread.currentThread().getName() + " with object ids " + objectIds.toString());
return objectIds.stream().collect(Collectors.toList());
}
}
第三步:也是最核心的一步,通过分块然后进行并行处理。当然为了保持代码的整洁性和隔离性,我们将在后续具体实现中使用Decorator修饰模式。
@Service
@Primary
@ConditionalOnProperty(prefix = "service", name = "parallel", havingValue = "true")
public class ProcessingServiceParallelRunDecorator implements ProcessingService {
private ProcessingService delegate;
public ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
this.delegate = delegate;
}
private int batchSize = 10;
@Override
public List<Integer> processObjects(List objectIds) {
List<List<Integer>> chuncks = getBatches(objectIds, batchSize);
List<List<Integer>> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
.collect(Collectors.toList());
return flatList(processedObjectIds);
}
private List> getBatches(List collection, int batchSize) {
return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)
.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))
.collect(Collectors.toList());
}
private List flatList(List> listOfLists) {
return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);
}
最后,我们通过一个简单的单元测试验证一下执行的结果。
private List> getBatches(List collection, int batchSize) {
return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)
.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))
.collect(Collectors.toList());
}
private List flatList(List> listOfLists) {
return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);
}
}
通过输出日志,我们看到如下的执行结果:
ProcessingDBService: Running in thread ForkJoinPool.commonPool-worker-3 with object ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ProcessingDBService: Running in thread main with object ids [11, 12]
执行结果也是符合预期目标的。List对象分组后,除了主线程又通过ForkJoin启动另外线程进行并行处理。ProcessingServiceParallelRunDecorator 的parallelStream().map的并行处理提升了处理性能,而ProcessingDBService中processObjects这个public方法上@Transactional的注解保证了业务完整性,问题得以完美解决。
译者介绍
胥磊,51CTO社区编辑,某头部电商技术副总监,关注Java后端开发,技术管理,架构优化,分布式开发等领域。
原文Multi-Threading and Spring Transactions,作者:Daniela Kolarova