1. 简介
异步任务: 它允许将耗时的任务异步执行,从而提高系统的并发能力和响应速度。异步任务可以在启动类上使用注解@EnableAsync进行启用,并在需要异步执行的任务上使用@Async标注该方法为异步任务。通过这种方式,可以快速地在SpringBoot应用程序中实现异步处理。
任务调度: 是SpringBoot中用于管理和执行任务的机制。通过任务调用,可以轻松地调度、并行处理任务。可以在启动类上添加@EnableScheduling注解进行开启。在需要执行的调度方法上使用@Scheduled注解。
异步请求: 是Web请求的一种处理方式,它允许后端在接收到请求后,新开一个线程来执行业务逻辑,释放请求线程,避免请求线程被大量耗时的请求沾满,导致服务不可用。在SpringBoot中,异步请求可以通过Controller的返回值来控制,支持多种类型。
以上不管是任务还是请求都会在一个异步线程中执行,这异步线程是使用的同一个吗?还是各自都有各自的线程池?接下来将通过源码的方式分析他们各自使用的线程池及自定义线程池。
2. 源码分析
2.1 异步任务(@EnableAsync)
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {}
核心类AsyncConfigurationSelector
public class AsyncConfigurationSelector extends AdviceModeImportSelector {
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
// 向容器注册ProxyAsyncConfiguration
return new String[] {ProxyAsyncConfiguration.class.getName()};
// ...
}
}
}
ProxyAsyncConfiguration配置类中主要就是注册了BeanPostProcessor用来处理@Async注解的方法。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
// 处理器,该处理器获取父类中的Supplier
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
// ...
return bpp;
}
}
AbstractAsyncConfiguration
@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {
@Nullable
protected Supplier executor;
// 自动装配AsyncConfigurer,我们可以通过实现该接口来实现自己的Executor
// 默认情况下系统也没有提供默认的AsyncConfigurer.
@Autowired
void setConfigurers(ObjectProvider configurers) {
Supplier configurer = SingletonSupplier.of(() -> {
List candidates = configurers.stream().collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
// 如果存在多个将抛出异常(也就相当于容器中有多个Executor)
if (candidates.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});
// 调用AsyncConfigurer#getAsyncExecutor方法,默认返回的null
// 所以最终这里返回的null
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
}
private Supplier adapt(Supplier supplier, Function provider) {
return () -> {
AsyncConfigurer configurer = supplier.get();
return (configurer != null ? provider.apply(configurer) : null);
};
}
}
AsyncConfigurer
public interface AsyncConfigurer {
@Nullable
default Executor getAsyncExecutor() {
return null;
}
@Nullable
default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
到此,默认情况下通过ProxyAsyncConfiguration是不能得到Executor执行器,也就是到目前为止AsyncAnnotationBeanPostProcessor中的线程池对象还是null。接下来进入到AsyncAnnotationBeanPostProcessor处理器中。
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
public void setBeanFactory(BeanFactory beanFactory) {
// 创建切面类,而这里的executor根据上文的分析还是null。
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
// ...
}
}
AsyncAnnotationAdvisor
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
public AsyncAnnotationAdvisor(
@Nullable Supplier executor, @Nullable Supplier exceptionHandler) {
// 构建通知类,这里传入的executor为null
this.advice = buildAdvice(executor, exceptionHandler);
}
protected Advice buildAdvice(
@Nullable Supplier executor, @Nullable Supplier exceptionHandler)
// 创建拦截器
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
// executor为null, configure是父类(AsyncExecutionAspectSupport )方法
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
// 这个方法非常重要,我们的AnnotationAsyncExecutionInterceptor 并没有
// 被容器管理,所以内部的BeanFactory对象没法注入是null,所以进行了设置
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
}
AsyncExecutionAspectSupport
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
public void configure(@Nullable Supplier defaultExecutor,
@Nullable Supplier exceptionHandler) {
// 这里的defaultExecutor获取到d饿还是null,所以这里提供了默认的获取Executor的方法
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
}
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// 从容器中查找TaskExecutor类型的Bean对象
// 由于SpringBoot提供了一个自动配置TaskExecutionAutoConfiguration
// 所以这里直接就返回了
return beanFactory.getBean(TaskExecutor.class);
}
// 如果容器中有多个TaskExecutor,那么将会抛出下面这个异常
catch (NoUniqueBeanDefinitionException ex) {
try {
// 获取beanName=taskExecutor 获取Executor
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
// 如果没有这样的Bean则结束了
catch (NoSuchBeanDefinitionException ex2) {
}
}
// 如果容器中没有TaskExecutor类型的Bean抛出该异常
catch (NoSuchBeanDefinitionException ex) {
try {
// 这里与上面一样了
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
}
}
// 最终返回null,那么程序最终也会报错。
return null;
}
}
如果Executor返回的null,那么最终程序抛出异常
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport {
public Object invoke(final MethodInvocation invocation) throws Throwable {
// ...
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
}
}
到此应该说清楚了系统是如何查找Executor执行器的(线程池对象)。接下来通过实例演示:
@Service
public class AsyncService {
@Async
public void async() {
System.out.printf("%s - 异步任务执行...%n", Thread.currentThread().getName()) ;
}
}
默认情况
taskExecutor-2 - 异步任务执行...
可以通过如下配置修改默认配置
spring:
task:
execution:
pool:
core-size: 2
max-size: 2
allow-core-thread-timeout: false
thread-name-prefix: pack-
输出
pack-2 - 异步任务执行...
自定义TaskExecutor
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);//核心线程数
pool.setMaxPoolSize(10);//最大线程数
pool.setQueueCapacity(25);//线程队列
pool.setThreadNamePrefix("pack-custom-") ;
pool.initialize();//线程初始化
return pool;
}
根据上面源码的分析,也可以自定义AsyncConfigurer
@Component
public class PackAsyncConfigurer implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
return new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)) ;
}
}
以上就是关于异步任务的源码分析及自定义实现。
2.2 异步请求
在SpringMVC中我们可以将Controller接口的返回值定义为:DeferredResult、Callable、Reactive Types等。只要返回值是这些,我们耗时的代码就可以放到一个异步的线程中执行。这里我们一Callable为例,先看看效果
@GetMapping("/callable")
public Callable
执行结果
http-nio-8882-exec-4 - 开始时间:1705560641226
http-nio-8882-exec-4 - 结束时间:1705560641227
总耗时:1毫秒
pack-2
根据最后一行的输出,我们在配置文件中配置的生效了。也就是异步请求与异步任务这一点是相同的,默认会使用系统默认提供的线程池(上面介绍了默认的自动配置)。
源码分析
在SpringMVC中具体Controller接口的方法的调用是通过HandlerMapping,而这个具体实现是RequestMappingHandlerAdapter,所以我们就先从这里开始看
public class RequestMappingHandlerAdapter {
// 根据输出实际并没有使用这里默认的
private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("MvcAsync");
protected ModelAndView handleInternal(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
invokeHandlerMethod(request, response, handlerMethod);
}
protected ModelAndView invokeHandlerMethod(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
// ...
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
// 设置异步任务执行器(线程池)
asyncManager.setTaskExecutor(this.taskExecutor);
// ...
}
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
既然没有使用默认的线程池对象,那么这里是如果设置的系统默认TaskExecutor? 这是在容器实例化RequestMappingHandlerAdapter Bean对象时设置。
public class WebMvcConfigurationSupport {
@Bean
public RequestMappingHandlerAdapter requestMappingHandlerAdapter(...) {
RequestMappingHandlerAdapter adapter = createRequestMappingHandlerAdapter();
// ...
AsyncSupportConfigurer configurer = getAsyncSupportConfigurer();
if (configurer.getTaskExecutor() != null) {
adapter.setTaskExecutor(configurer.getTaskExecutor());
}
// ...
return adapter;
}
protected AsyncSupportConfigurer getAsyncSupportConfigurer() {
// 默认为null
if (this.asyncSupportConfigurer == null) {
this.asyncSupportConfigurer = new AsyncSupportConfigurer();
// 调用子类DelegatingWebMvcConfiguration方法(子类重写了)
configureAsyncSupport(this.asyncSupportConfigurer);
}
return this.asyncSupportConfigurer;
}
}
// 子类
public class DelegatingWebMvcConfiguration extends WebMvcConfigurationSupport {
private final WebMvcConfigurerComposite configurers = new WebMvcConfigurerComposite();
// 获取当前环境下所有自定义的WebMvcConfigurer
@Autowired(required = false)
public void setConfigurers(List configurers) {
if (!CollectionUtils.isEmpty(configurers)) {
this.configurers.addWebMvcConfigurers(configurers);
}
}
protected void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 在组合器中依次调用自定义的WebMvcConfigurer(如果有重写对应的configureAsyncSupport方法)
this.configurers.configureAsyncSupport(configurer);
}
}
SpringBoot默认提供了一个自定义的WebMvcConfigurer且重写了configureAsyncSupport方法。
public static class WebMvcAutoConfigurationAdapter implements WebMvcConfigurer {
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 判断容器中是否以applicationTaskExecutor为beanName的bean
// SpringBoot自动配置有提供这样的bean(TaskExecutionAutoConfiguration)
if (this.beanFactory.containsBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)) {
// 获取bean对象
Object taskExecutor = this.beanFactory
.getBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME);
if (taskExecutor instanceof AsyncTaskExecutor) {
configurer.setTaskExecutor(((AsyncTaskExecutor) taskExecutor));
}
}
// ...
}
}
到此,异步请求使用的线程池应该清楚了使用的是系统默认的线程池可通过配置文件修改默认值。我们也可以通过自定义WebMvcConfigurer来重写对应的方法实现自己的线程池对象。
总结:在默认的情况下,异步任务与异步请求使用的是同一个线程池对象。
2.3 任务调用
先通过一个示例,查看默认执行情况
@Service
public class SchedueService {
@Scheduled(cron = "*/2 * * * * *")
public void task() {
System.out.printf("%s: %d - 任务调度%n", Thread.currentThread().getName(), System.currentTimeMillis()) ;
}
}
输出
scheduling-1: 1705564144014 - 任务调度
scheduling-1: 1705564146010 - 任务调度
scheduling-1: 1705564148003 - 任务调度
scheduling-1: 1705564150005 - 任务调度
scheduling-1: 1705564152001 - 任务调度
使用的scheduling相应的线程池,每隔2s执行任务。
源码分析
@Import(SchedulingConfiguration.class)
public @interface EnableScheduling {}
核心类SchedulingConfiguration
@Configuration(proxyBeanMethods = false)
public class SchedulingConfiguration {
// 该bean专门用来处理@Scheduled注解的核心处理器类
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
ScheduledAnnotationBeanPostProcessor
public class ScheduledAnnotationBeanPostProcessor {
// 看到这个就能猜到,系统默认是获取的taskScheduler bean对象
public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是同一个容器
if (event.getApplicationContext() == this.applicationContext) {
// 注册任务
finishRegistration();
}
}
private void finishRegistration() {
if (this.beanFactory instanceof ListableBeanFactory) {
// 获取所有SchedulingConfigurer,我们可以实现该接口自定义配置线程池
Map beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
// 在这里可以自定义的不仅仅是线程池了,还有其它的东西
configurer.configureTasks(this.registrar);
}
}
// 在默认情况下,上面是没有自定义的SchedulingConfigurer。
// 有调度的任务并且当前的调度任务注册器中没有线程池对象
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
try {
// 通过类型查找TaskScheduler类型的bean对象。这里就会获取到系统默认的
// 由TaskSchedulingAutoConfiguration自动配置
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
// 如果查找到了多个则进入这个catch(如你自己也定义了多个)
catch (NoUniqueBeanDefinitionException ex) {
try {
// 通过beanName进行查找
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
}
}
catch (NoSuchBeanDefinitionException ex) {
try {
// 如果不存在则在查找ScheduledExecutorService
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
// 如果有多个则再根据名字查找
catch (NoUniqueBeanDefinitionException ex2) {
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
}
}
}
this.registrar.afterPropertiesSet();
}
}
以上就是任务调用如何查找使用线程池对象的。根据上面的分析我们也可以通过如下的方式进行自定义处理。
自定义SchedulingConfigurer
@Component
public class PackSchedulingConfigurer implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler() ;
taskScheduler.setThreadNamePrefix("pack-schedule-") ;
taskScheduler.afterPropertiesSet() ;
taskRegistrar.setTaskScheduler(taskScheduler ) ;
}
}
执行结果
pack-schedule-1: 1705567234013 - 任务调度
pack-schedule-1: 1705567236011 - 任务调度
自定义ThreadPoolTaskScheduler
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler() ;
scheduler.setThreadNamePrefix("pack-custom-scheduler") ;
return scheduler ;
}
执行结果
pack-custom-scheduler1: 1705567672013 - 任务调度
pack-custom-scheduler1: 1705567674011 - 任务调度
通过配置文件自定义默认配置
spring:
task:
scheduling:
pool:
size: 2
thread-name-prefix: pack-custom-
注意:系统默认的任务调用pool.size=1。所以如果你有多个调度任务要当心了。