前言
用过Spring的人多多少少也都用过@Async注解,至于作用嘛,看注解名,大概能猜出来,就是在方法执行的时候进行异步执行。
一、如何使用@Async
使用@Async注解主要分两步:
1.在配置类上添加@EnableAsync注解
@ComponentScan(value = "com.wang")
@Configuration
@EnableAsync
public class AppConfig {
}
2.在想要异步执行的方法上面加上@Async
@Service
public class CycleService2 {
@Autowired
private CycleService1 cycleService1;
@Async
public void alsoDo() {
System.out.println("create cycleService2");
}
}
二、源码解读
1.@EnableAsync的作用
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
2. AsyncConfigurationSelector的作用
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
看过我之前博客的同学应该知道,其实此处就是往Spring容器中增加一个新的需要扫描的类,很明显可以看到差别主要集中在adviceMode的差别上。
3. adviceMode:PROXY(默认值)
引入了ProxyAsyncConfiguration配置类
3.1 ProxyAsyncConfiguration
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
作用也很明显,就是往spring容器中添加了AsyncAnnotationBeanPostProcessor类
3.2 AsyncAnnotationBeanPostProcessor
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
// 删除了一些无关紧要,或者默认不会设置的属性
public AsyncAnnotationBeanPostProcessor() {
setBeforeExistingAdvisors(true);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
其实可以看到最重要的方法,就是setBeanFactory了,该方法是在AsyncAnnotationBeanPostProcessor的生命周期最后一步initializeBean里面的第一小步,也就是执行所有Aware接口的时候执行。
对于AOP来说,其实最主要的就是advice+pointcut,也就是advisor,在生命周期的这一步,也创建了advisor。
3.3 AsyncAnnotationAdvisor
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
可以看到最主要的工作就是buildAdvice和buildPointcut。advice的作用是定义在方法执行方面,该如何执行;pointcut的作用是定义方法的范围
3.3.1 buildAdvice
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// new了一个interceptor
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
可以看到advice主要就是定义了一个烂机器interceptor,在方法执行的时候进行一些拦截,至于executor,是方法执行器,默认为null,exceptionHandler也默认是null。
3.3.1.1 AnnotationAsyncExecutionInterceptor,异步执行的原理
在AnnotationAsyncExecutionInterceptor的父类AsyncExecutionInterceptor中,实现了拦截器的接口方法invoke,也就是真实的方法执行逻辑。
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//将当前方法执行封装成一个callable对象,然后放入到线程池里
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//任务提交
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
可以看到主要做的事情是:
- 寻找任务执行器:
- 从@Async注解里面获取配置的任务执行器
- 从Spring容器中找TaskExecutor类的bean
- 从spring容器中获取名为"taskExecutor"的bean,
- 如果还没有,new SimpleAsyncTaskExecutor())可以看到其实我们是可以给@Async进行任务执行器的配置的。
- 将具体的方法封装成callable的对象,然后doSubmit
- 此处我们就看一下默认的doSumit,使用的SimpleAsyncTaskExecutor是如何实现的
- 最终会执行到下面这个doExecute方法,默认情况下threadFactory是null,所以默认情况下,我们的方法,每次都是被创建了一个新的守护线程来进行方法的执行。
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
3.3.1.2 自定义任务执行器
- 可以在配置类里new SimpleAsyncTaskExecutor(),然后setThreadFactory,这样修改了默认线程的产生方式
- 比较主流的方式是,定义一个ThreadPoolTaskExecutor,也就是线程池任务执行器,可以进行线程复用
3.3.2 buildPointcut
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 就是根据这两个匹配器进行匹配的
// 检查类上是否有@Async注解
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
//检查方法上是否有@Async注解
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
// 取并集:类上加了@Async或者类的方法上加了@Async
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
主要方法就是定义了一个类匹配pointcut和一个方法匹配pointcut。
4 什么时候判断进行advice的添加呢
当然就是在对某个bean进行proxy的判断的时候,也就是bean的生命周期最后一步,也是initializeBean里最后的一步,对于BeanPostProcessor的执行
3.4.1 AsyncAnnotationBeanPostProcessor#postProcessAfterInitialization
要注意的是AsyncAnnotationBeanPostProcessor的postProcessAfterInitialization方法其实是继承的是父类AbstractAdvisingBeanPostProcessor的。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// 没有通知,或者是AOP的基础设施类,那么不进行代理
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
// 对已经被代理的类,不再生成代理,只是将通知添加到代理类的逻辑中
// 这里通过beforeExistingAdvisors决定是将通知添加到所有通知之前还是添加到所有通知之后
// 在使用@Async注解的时候,beforeExistingAdvisors被设置成了true,
// @Async注解之所以把beforeExistingAdvisors设置为true,是因为该advisor和其他的advisor差别太大了,从情理上讲,也应该第一个执行
// 意味着整个方法及其拦截逻辑都会异步执行
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
// 判断bean是否符合该advisor的使用范围,通过pointcut来判断
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
// 如果还不是一个代理类,也需要通过eligible来判断是否符合使用该advisor的条件
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
return proxyFactory.getProxy(getProxyClassLoader());
}
// No proxy needed.
return bean;
}
而在isEligible中,就是判断当前执行生命周期的bean是否满足我们的@Async注解的使用范围,主要是通过其class来判断
protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
// 其实就是判断类是否可以进行添加该advisor,也就是判断是否符合该advisor的使用条件
// 就是把advisor的pointCut拿出来,pointCut里的classMatcher和methodMatcher拿出来对类及其方法进行判断
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}
具体的AopUtils.canApply(this.advisor, targetClass)逻辑就不写了,就是根据pointcut里设置的classFilter和methodMatcher类判断当前bean的class是否需要进行该advisor的使用。
总结
发现@Async注解还是挺麻烦的,特别是要写一篇简单易懂的博客,更难。
默认配置实现原理:在执行的时候将method最终封装成一个Runable对象,然后new一个线程,通过线程的start方法,进行method的执行,来实现异步。
到此这篇关于Java Spring之@Async原理案例详解的文章就介绍到这了,更多相关Java Spring之@Async原理内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!