文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

@EnableAsync的使用、进阶、源码分析

2023-10-24 11:16

关注

@EnableAsync使用

基础使用

使用@EnableAsync开启异步切面,然后在异步调用的方法上加上@Asyc注解即可

@SpringBootApplication@EnableAsync //开启异步切面public class SpringdemoApplication {    public static void main(String[] args) {        SpringApplication.run(SpringdemoApplication.class, args);    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async //异步    @Override    public void invokeAsyncTest01() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!");    }}

自定义异步注解

@Async注解是异步切面默认的异步注解,我们可以在@EnableAsync(annotation = AsyncCustom.class)开启异步切面时指定自定义的异步注解

@Target({ElementType.TYPE, ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface AsyncCustom {}
@SpringBootApplication@EnableAsync(annotation = AsyncCustom.class)public class SpringdemoApplication {    public static void main(String[] args) {        SpringApplication.run(SpringdemoApplication.class, args);    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @AsyncCustom //异步    @Override    public void invokeAsyncTest01() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!");    }}

@EnableAsync进阶

线程池配置

配置默认线程池

当@Async注解的value没有指定线程池名称时,将会使用此线程池,不手动设置默认线程池,系统也会给你创建一个默认线程池(详细流程请看 线程池获取优先级)。

@Slf4j@Componentpublic class AsyncConfig implements AsyncConfigurer {    @Override    public Executor getAsyncExecutor() {//此处最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();        taskExecutor.setThreadNamePrefix("CustomAsync-Test-");        return taskExecutor;    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async    @Override    public void invokeAsyncTest01() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!");    }}

指定线程池 (建议,根据业务进行线程池隔离)

当@Async注解的value有指定线程池名称时,将会使用容器中beanname=此value值的Executor线程池

@Configurationpublic class TaskExecutorConfig {    @Bean    public Executor deleteFileExecutor() {//此处最好使用new ThreadPoolExecutor显示创建,SimpleAsyncTaskExecutor没有复用线程        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();        taskExecutor.setThreadNamePrefix("delete-file-");        return taskExecutor;    }    @Bean    public Executor sendEmailExecutor() {//此处最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();        taskExecutor.setThreadNamePrefix("send-email-");        return taskExecutor;    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async("deleteFileExecutor")    @Override    public void deleteFile() {        System.out.println(Thread.currentThread() + "运行了deleteFile方法!");    }    @Async("sendEmailExecutor")    @Override    public void sendEmail() {        System.out.println(Thread.currentThread() + "运行了sendEmail方法!");    }}

异步任务结果

只要是异步,一般都有可能用到需要返回结果的异步任务,当然@Async也支持异步结果返回,目前仅支持CompletableFuture、ListenableFuture、Future

CompletableFuture

@RestController@RequestMapping("/testasync")public class TestAsyncController {    @Autowired    private AsyncTestService asyncTestService;    @GetMapping("/test02")    public void test02() {        CompletableFuture<String> completableFuture = asyncTestService.invokeAsyncTest02();        completableFuture.thenAccept(System.out::println);    } }
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async    @Override    public CompletableFuture<String> invokeAsyncTest02() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest02方法!");        return CompletableFuture.completedFuture("Hello world!");    }}

ListenableFuture

@RestController@RequestMapping("/testasync")public class TestAsyncController {    @Autowired    private AsyncTestService asyncTestService;    @GetMapping("/test03")    public void test03() {        ListenableFuture<String> stringListenableFuture = asyncTestService.invokeAsyncTest03();        stringListenableFuture.addCallback(System.out::println, System.out::println);    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async    @Override    public ListenableFuture<String> invokeAsyncTest03() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest03方法!");        return new AsyncResult<String>("Hello World!");    }}

Future

@RestController@RequestMapping("/testasync")public class TestAsyncController {    @Autowired    private AsyncTestService asyncTestService;    @GetMapping("/test04")    public void test04() throws ExecutionException, InterruptedException {        Future<String> future = asyncTestService.invokeAsyncTest04();        String str = future.get();        System.out.println(str);    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async    @Override    public Future<String> invokeAsyncTest04() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest04方法!");        return new AsyncResult<>("Hello World!");    }}

Future、ListenableFuture、CompletableFuture区别

异常处理器

当返回值是Future及其子类

此时,如果异步任务在执行时抛出异常时,异常先会存储在Future中并记录状态,当正真调用future.get()等获取结果函数时才会抛出异常。

@RestController@RequestMapping("/testasync")public class TestAsyncController {    @Autowired    private AsyncTestService asyncTestService;        @GetMapping("/test04")    public void test04() throws ExecutionException, InterruptedException {        Future<String> future = asyncTestService.invokeAsyncTest04();        //此时当当前线程获取结果时 才会抛出异常        String str = future.get();        System.out.println(str);    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async    @Override    public Future<String> invokeAsyncTest04() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest04方法!");        if(true){            throw new IllegalArgumentException("Hello sendEmailExecutor Exception!");        }        return new AsyncResult<>("Hello World!");    }}

当返回值是非Future

返回类型非Future时,任务发生异常将会调用异常处理器处理异常。异常处理器阔以AsyncConfigurer 实现类的getAsyncUncaughtExceptionHandler方法手动设置,如果未设置异常处理器,系统将会给你创建一个默认的SimpleAsyncUncaughtExceptionHandler异常处理器,此默认异常处理器异常处理器只对异常进行了日志输出

@Slf4j@Componentpublic class AsyncConfig implements AsyncConfigurer {    @Override    public Executor getAsyncExecutor() {        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();        taskExecutor.setThreadNamePrefix("CustomAsync-Test-");        return taskExecutor;    }    @Override    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {        return (ex, method, params) -> {            System.err.println("Unexpected exception occurred invoking async method: " + method + ":" + ex.getMessage());        };    }}
@RestController@RequestMapping("/testasync")public class TestAsyncController {    @Autowired    private AsyncTestService asyncTestService;    @GetMapping("/test06")    public void test06() {        asyncTestService.invokeAsyncTest06();    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async    @Override    public void invokeAsyncTest06() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest06方法!");        throw new IllegalArgumentException("Hello Exception!");    }}

扩展异常处理器

原因

博主通过源码发现,异常处理器只能设置一个,且后续所有@Async使用的线程池全都只有走我们设置的默认异常处理器,如果我们根据业务划分了线程池,不同线程池的异常想走不同的处理逻辑,就只有在我们手动设置的异常处理器中进行逻辑判断,非常的不优雅。

博主的解决方案
  1. 扩展@Async注解,添加exceptionHandler属性指定异常处理器AsyncUncaughtExceptionHandler 的容器名
  2. 在设置AsyncConfigurer 实现类getAsyncUncaughtExceptionHandler方法设置一个自定义异常处理器,此处理器读取异常方法@Async的exceptionHandler属性值,然后获取到容器中名为exceptionHandler属性值的异常处理器
  3. 如果能在容器找到给定容器名称的异常处理器,就走此异常处理器
  4. 如果不能找到给定容器名称的处理器,就走默认异常处理器
  5. 如果没有设置@Async的exceptionHandler属性值,也走默认异常处理器
方案实现

扩展@Async注解,添加@JokerAsync继承@Async,添加exceptionHandler属性

@Target({ElementType.TYPE, ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documented@Asyncpublic @interface JokerAsync {    @AliasFor(annotation = Async.class)    String value() default "";    String exceptionHandler() default "";}

把AsyncConfigurer 实现类getAsyncUncaughtExceptionHandler方法设置一个自定义异常处理器,此处理器读取异常方法@Async的exceptionHandler属性值,然后获取到容器中名为exceptionHandler属性值的异常处理器

@Slf4j@Componentpublic class AsyncConfig implements AsyncConfigurer {    @Autowired(required = false)    private Map<String, AsyncUncaughtExceptionHandler> exceptionHandlerMap = new HashMap<>();    private final AsyncUncaughtExceptionHandler defaultExceptionHandler = new SimpleAsyncUncaughtExceptionHandler();    @Override    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {        return (ex, method, params) -> {            String qualifier = getExceptionHandlerQualifier(method);            AsyncUncaughtExceptionHandler exceptionHandler = null;            if (Objects.nonNull(qualifier) && qualifier.length() > 0) {                exceptionHandler = exceptionHandlerMap.get(qualifier);            }            if (Objects.isNull(exceptionHandler)) {                exceptionHandler = defaultExceptionHandler;            }            exceptionHandler.handleUncaughtException(ex, method, params);        };    }    protected String getExceptionHandlerQualifier(Method method) {        JokerAsync async = AnnotatedElementUtils.findMergedAnnotation(method, JokerAsync.class);        if (async == null) {            async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), JokerAsync.class);        }        return (async != null ? async.exceptionHandler() : null);    }}

测试示例代码

@Slf4j@Componentpublic class DeleteFileAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {    @Override    public void handleUncaughtException(Throwable ex, Method method, Object... params) {        log.error("DeleteFileAsyncUncaughtExceptionHandler Unexpected exception occurred invoking async method: " + method, ex);    }}
@Slf4j@Componentpublic class SendFileAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {    @Override    public void handleUncaughtException(Throwable ex, Method method, Object... params) {        log.error("SendFileAsyncUncaughtExceptionHandler Unexpected exception occurred invoking async method: " + method, ex);    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @JokerAsync(exceptionHandler = "deleteFileAsyncUncaughtExceptionHandler")    @Override    public void deleteFile() {        System.out.println(Thread.currentThread() + "运行了deleteFile方法!");        throw new IllegalArgumentException("Hello deleteFileExecutor Exception!");    }    @JokerAsync(exceptionHandler = "sendFileAsyncUncaughtExceptionHandler")    @Override    public void sendEmail() {        System.out.println(Thread.currentThread() + "运行了sendEmail方法!");        throw new IllegalArgumentException("Hello sendEmailExecutor Exception!");    }}
@RestController@RequestMapping("/testasync")public class TestAsyncController {    @Autowired    private AsyncTestService asyncTestService;    @GetMapping("/sendEmail")    public void sendEmail() {        asyncTestService.sendEmail();    }    @GetMapping("/deleteFile")    public void deleteFile() {        asyncTestService.deleteFile();    }}

结果如下:不同的业务走不同的异常处理器
在这里插入图片描述

源码分析

首先咱们从@EnableAsync入口开始看起

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented//使用@Import 导入AsyncConfigurationSelector类到容器中@Import(AsyncConfigurationSelector.class) public @interface EnableAsync {//自定义异步注解Class<? extends Annotation> annotation() default Annotation.class;//JDK代理 还是 CGLIB代理boolean proxyTargetClass() default false;AdviceMode mode() default AdviceMode.PROXY;int order() default Ordered.LOWEST_PRECEDENCE;}

注意使用@Import注解导入的一般会实现ImportSelector 接口,则ImportSelector 中的selectImports方法返回的类的完全限定名数组中的类会被加入到容器中;如果是实现了ImportBeanDefinitionRegistrar接口,则会调用registerBeanDefinitions的方法

public interface ImportSelector {String[] selectImports(AnnotationMetadata importingClassMetadata);@Nullabledefault Predicate<String> getExclusionFilter() {return null;}}public interface ImportBeanDefinitionRegistrar {default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry,BeanNameGenerator importBeanNameGenerator) {registerBeanDefinitions(importingClassMetadata, registry);}default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {}}

继续看@EnableAsync使用@Import导入的AsyncConfigurationSelector类

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME ="org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";@Override@Nullablepublic String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {//@EnableAsync mode属性默认为AdviceMode.PROXY case PROXY:return new String[] {ProxyAsyncConfiguration.class.getName()};case ASPECTJ:return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}}}

看哈AsyncConfigurationSelector的父类AdviceModeImportSelector

public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";protected String getAdviceModeAttributeName() {return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;}//importingClassMetadata 是加了@Import注解的类的元信息@Overridepublic final String[] selectImports(AnnotationMetadata importingClassMetadata) {Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);if (attributes == null) {throw new IllegalArgumentException(String.format("@%s is not present on importing class '%s' as expected",annType.getSimpleName(), importingClassMetadata.getClassName()));}//得到加了@Import注解类上的mode属性值AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());//模板方法 调用子类实现的selectImports方法得到需要导入到Spring容器中的类的String[] imports = selectImports(adviceMode);if (imports == null) {throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);}return imports;}@Nullableprotected abstract String[] selectImports(AdviceMode adviceMode);}

由于@EnableAsync mode属性默认为AdviceMode.PROXY ,所以ProxyAsyncConfiguration类将会导入容器继续点进去看

@Configuration(proxyBeanMethods = false)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {//把异步后置处理器放入容器中@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");//异步后置处理器AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();//把线程池和异常处理器放到后置处理器中bpp.configure(this.executor, this.exceptionHandler);//得到@EnableAsync中annotation的注解Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");//自定义注解不等于默认值时 把自定义异步注解放入后置处理器中if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}//设置动态代理方式bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}

看哈ProxyAsyncConfiguration 的父类AbstractAsyncConfiguration

@Configuration(proxyBeanMethods = false)public abstract class AbstractAsyncConfiguration implements ImportAware {@Nullableprotected AnnotationAttributes enableAsync;@Nullableprotected Supplier<Executor> executor;@Nullableprotected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;//importMetadata 是加了@Import注解的类的元信息@Overridepublic void setImportMetadata(AnnotationMetadata importMetadata) {//@EnableAsync的注解属性设置给enableAsync属性this.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));if (this.enableAsync == null) {throw new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());}}@Autowired(required = false)void setConfigurers(Collection<AsyncConfigurer> configurers) {if (CollectionUtils.isEmpty(configurers)) {return;}if (configurers.size() > 1) {throw new IllegalStateException("Only one AsyncConfigurer may exist");}AsyncConfigurer configurer = configurers.iterator().next();this.executor = configurer::getAsyncExecutor;this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;}}public interface AsyncConfigurer {//配置异步线程池@Nullabledefault Executor getAsyncExecutor() {return null;}//配置异步异常处理器@Nullabledefault AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return null;}}

上述代码表明 把@EnableAsync注解的属性解析了设置到了AsyncAnnotationBeanPostProcessor后置处理器中,还有AsyncConfigurer配置的线程池和异常处理器也设置到了后置处理中,现在我们继续看AsyncAnnotationBeanPostProcessor后置处理器的代码

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;//默认线程池@Nullableprivate Supplier<Executor> executor;//异常处理器@Nullableprivate Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;//异步注解@Nullableprivate Class<? extends Annotation> asyncAnnotationType;public AsyncAnnotationBeanPostProcessor() {setBeforeExistingAdvisors(true);}public void configure(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {this.executor = executor;this.exceptionHandler = exceptionHandler;}public void setExecutor(Executor executor) {this.executor = SingletonSupplier.of(executor);}public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {this.exceptionHandler = SingletonSupplier.of(exceptionHandler);}public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");this.asyncAnnotationType = asyncAnnotationType;}@Overridepublic void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);//我们阔以看到此处创建了一个通知器 把线程池和异常处理器传进去 AsyncAnnotation  advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);if (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}//把类工厂传入通知器中advisor.setBeanFactory(beanFactory);//把通知器赋给本类的成员变量this.advisor = advisor;}}

上诉代码主要是把增强的advisor 类创建好并复制给了本类成员变量,
下面我们继续看此类的父类AbstractAdvisingBeanPostProcessor,应为此类实现了BeanPostProcessor 接口,所以初始化完后肯定会调用postProcessAfterInitialization方法

public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {@Nullableprotected Advisor advisor;protected boolean beforeExistingAdvisors = false;private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {this.beforeExistingAdvisors = beforeExistingAdvisors;}@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) {return bean;}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {if (this.advisor == null || bean instanceof AopInfrastructureBean) {// Ignore AOP infrastructure such as scoped proxies.return bean;}//如果被代理过 直接把Advisor加入到代理里中的Advisor列表中if (bean instanceof Advised) {Advised advised = (Advised) bean;if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {// Add our local Advisor to the existing proxy's Advisor chain...if (this.beforeExistingAdvisors) {advised.addAdvisor(0, this.advisor);}else {advised.addAdvisor(this.advisor);}return bean;}}//如果没被代理过但是需要被代理的类 创建代理并直接加入到增强Advisor加入的Advisor列表中,并返回代理类if (isEligible(bean, beanName)) {ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);if (!proxyFactory.isProxyTargetClass()) {evaluateProxyInterfaces(bean.getClass(), proxyFactory);}proxyFactory.addAdvisor(this.advisor);customizeProxyFactory(proxyFactory);// Use original ClassLoader if bean class not locally loaded in overriding class loaderClassLoader classLoader = getProxyClassLoader();if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();}return proxyFactory.getProxy(classLoader);}// No proxy needed.return bean;}protected boolean isEligible(Object bean, String beanName) {return isEligible(bean.getClass());}//判断此类是否需要代理protected boolean isEligible(Class<?> targetClass) {Boolean eligible = this.eligibleBeans.get(targetClass);if (eligible != null) {return eligible;}if (this.advisor == null) {return false;}eligible = AopUtils.canApply(this.advisor, targetClass);this.eligibleBeans.put(targetClass, eligible);return eligible;}protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {ProxyFactory proxyFactory = new ProxyFactory();proxyFactory.copyFrom(this);proxyFactory.setTarget(bean);return proxyFactory;}protected void customizeProxyFactory(ProxyFactory proxyFactory) {}}

上述代码可以知道,只是把增强的advisor 放入代理类中,所以我们只需要看advisor 中的增强方法就知道增强的代码逻辑。我们来看advisor 成员的实现类AsyncAnnotationAdvisor,而AsyncAnnotationAdvisor是Advisor的实现类。而Advisor实现类一般会包含一般里面持有一个Advice和一个PointCut类,而Advice的子类MethodInterceptor的invoke方法就是代理的主要增强代码实现的地方

* Advice:通知,标识逻辑织入的位置(增强代码调用的地方)。* PointCut:切入点,标识对什么方法进入代理(判断哪个方法能被增强);* Advisor:通知器,是通知与切入点的集合(一般里面持有一个Advice和一个PointCut,用来标识一个切面增强)。
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {private Advice advice;private Pointcut pointcut;public AsyncAnnotationAdvisor() {this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);}public AsyncAnnotationAdvisor(@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));}@SuppressWarnings("unchecked")public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);asyncAnnotationTypes.add(Async.class);try {asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}//通知实现this.advice = buildAdvice(executor, exceptionHandler);//切入点实现this.pointcut = buildPointcut(asyncAnnotationTypes);}public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();asyncAnnotationTypes.add(asyncAnnotationType);this.pointcut = buildPointcut(asyncAnnotationTypes);}@Overridepublic void setBeanFactory(BeanFactory beanFactory) {if (this.advice instanceof BeanFactoryAware) {((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);}}@Overridepublic Advice getAdvice() {return this.advice;}@Overridepublic Pointcut getPointcut() {return this.pointcut;}protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {//核心通知类AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;}protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc);}result = result.union(mpc);}return (result != null ? result : Pointcut.TRUE);}}

上面代码可以知道核心通知的实现类是AnnotationAsyncExecutionInterceptor,那就继续AnnotationAsyncExecutionInterceptor代码

public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {super(defaultExecutor);}public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {super(defaultExecutor, exceptionHandler);}@Override@Nullableprotected String getExecutorQualifier(Method method) {// Maintainer's note: changes made here should also be made in// AnnotationAsyncExecutionAspect#getExecutorQualifierAsync async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);if (async == null) {async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);}return (async != null ? async.value() : null);}}

没有看到我们需要的invoke方法,继续看父类AsyncExecutionInterceptor

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {super(defaultExecutor);}public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {super(defaultExecutor, exceptionHandler);}@Override@Nullablepublic Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);//通过方法上的@Async注解里的value参数 value参数就是线程池Executor放入Spring容器的名称 ********AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);if (executor == null) {throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");}//把任务调用封装成callable方法  ****************Callable<Object> task = () -> {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}}//如果出现了异常 走异常处理器catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};//把callable 线程池 和 方法返回类型一同传到doSubmit方法 *************return doSubmit(task, executor, invocation.getMethod().getReturnType());}@Nullableprotected String getExecutorQualifier(Method method) {return null;}@Override@Nullableprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {Executor defaultExecutor = super.getDefaultExecutor(beanFactory);return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE;}}

我们会发现获取线程池方法和正真调用方法的doSubmit方法都是在父类AsyncExecutionAspectSupport中,继续看AsyncExecutionAspectSupport代码

public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";protected final Log logger = LogFactory.getLog(getClass());private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);private SingletonSupplier<Executor> defaultExecutor;private SingletonSupplier<AsyncUncaughtExceptionHandler> exceptionHandler;@Nullableprivate BeanFactory beanFactory;public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {//默认线程池 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));//默认异常处理器this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);}public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {//默认线程池 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));//默认异常处理器this.exceptionHandler = SingletonSupplier.of(exceptionHandler);}public void configure(@Nullable Supplier<Executor> defaultExecutor,@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {//默认线程池this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));//默认异常处理器this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);}public void setExecutor(Executor defaultExecutor) {this.defaultExecutor = SingletonSupplier.of(defaultExecutor);}public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {this.exceptionHandler = SingletonSupplier.of(exceptionHandler);}@Overridepublic void setBeanFactory(BeanFactory beanFactory) {this.beanFactory = beanFactory;}@Nullableprotected AsyncTaskExecutor determineAsyncExecutor(Method method) {//先从缓存中取AsyncTaskExecutor executor = this.executors.get(method);//没有在从容器中找if (executor == null) {Executor targetExecutor;//得到此方法中@Async属性value的值 即  容器中线程池的Bean名称String qualifier = getExecutorQualifier(method);//如果设置了value值 就从容器中获取if (StringUtils.hasLength(qualifier)) {targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);}//如果没有设置value值 就获取AsyncConfigurer配置的默认线程池 如果没有就从容器中获取TaskExecutor的实现类,如果有多个TaskExecutor实现类,就取容器bean名称为“taskExecutor”的容Bean类else {targetExecutor = this.defaultExecutor.get();}if (targetExecutor == null) {return null;}executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));//放入缓存中this.executors.put(method, executor);}return executor;}@Nullableprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {if (beanFactory != null) {try {//先获取容器中TaskExecutor的实现类return beanFactory.getBean(TaskExecutor.class);}catch (NoUniqueBeanDefinitionException ex) {logger.debug("Could not find unique TaskExecutor bean. " +"Continuing search for an Executor bean named 'taskExecutor'", ex);try {//如果有多个就取名称DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor容器类return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskExecutor bean found within the context, and none is named " +"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());}}}//如果容器中没有TaskExecutor的实现类 取名称DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor容器类catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskExecutor bean. " +"Continuing search for an Executor bean named 'taskExecutor'", ex);try {return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);}catch (NoSuchBeanDefinitionException ex2) {logger.info("No task executor bean found for async processing: " +"no bean of type TaskExecutor and no bean named 'taskExecutor' either");}// Giving up -> either using local default executor or none at all...}}//走完所有都没取到 线程池  那么就返回null 子类中会判断如果返回null 将new出一个默认线程池return null;}@Nullableprotected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {//如果返回类型是CompletableFuture及其子类 if (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}//如果返回类型是ListenableFuture及其子类 else if (ListenableFuture.class.isAssignableFrom(returnType)) {return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}//如果返回类型是Future及其子类 else if (Future.class.isAssignableFrom(returnType)) {return executor.submit(task);}//如果返回类型是其他else {executor.submit(task);return null;}}protected void handleError(Throwable ex, Method method, Object... params) throws Exception {//如果返回类型是Future及其子类 发生异常 则直接丢出异常if (Future.class.isAssignableFrom(method.getReturnType())) {ReflectionUtils.rethrowException(ex);}//否则 则走异常处理器else {// Could not transmit the exception to the caller with default executortry {this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);}catch (Throwable ex2) {logger.warn("Exception handler for async method '" + method.toGenericString() +"' threw unexpected exception itself", ex2);}}}}

到此为止,源码已经分析的差不多了,我们阔以得出几个重点:

总结

线程池获取优先级

当@Async中value值没有指定线程池

/最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程

当@Async中value值指定了线程池beanname,可以根据业务进行线程池级别隔离

如果没有取到相应的线程池,比如beanname写错导致取不到相应线程池将会抛出异常

异常处理器

返回类型为Future及其子类时

返回类型不是Future及其子类

方法返回类型

CompletableFuture及其子类

//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokeCallable<Object> task = () -> {try {//@Async注释的方法调用Object result = invocation.proceed();//如果是Future类型 调用get获取结果值if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmitif (CompletableFuture.class.isAssignableFrom(returnType)) {//@Async注释的方法返回类型如果为CompletableFuture及其子类//就使用线程池执行并封装成CompletableFuture返回return CompletableFuture.supplyAsync(() -> {try {return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}

ListenableFuture及其子类

//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokeCallable<Object> task = () -> {try {//@Async注释的方法调用Object result = invocation.proceed();//如果是Future类型 调用get获取结果值if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmitif (ListenableFuture.class.isAssignableFrom(returnType)) {//@Async注释的方法返回类型如果为ListenableFuture及其子类//就使用线程池执行并返回ListenableFuturereturn ((AsyncListenableTaskExecutor) executor).submitListenable(task);}

注意ListenableFuture.addCallback()添加回调函数时,如果异步任务还未执行完成,则回调函数由异步任务线程执行,如果异步任务已经执行完成,则是当前掉addCallback函数的线程调用回调函数

Future及其子类

//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokeCallable<Object> task = () -> {try {//@Async注释的方法调用Object result = invocation.proceed();//如果是Future类型 调用get获取结果值if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmitif (Future.class.isAssignableFrom(returnType)) {//@Async注释的方法返回类型如果为Future及其子类//就使用线程池执行并返回Futurereturn executor.submit(task);}

其他

源码分析
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invokeCallable<Object> task = () -> {try {//@Async注释的方法调用Object result = invocation.proceed();//如果是Future类型 调用get获取结果值if (result instanceof Future) {return ((Future<?>) result).get();}}catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());}catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};//org.springframework.aop.interceptor.AsyncExecutionAspectSupport#doSubmit//@Async注释的方法返回类型如果是非Future//使用线程池执行后 直接返回nullexecutor.submit(task);return null;
当返回值为void时无返回值示例
@RestController@RequestMapping("/testasync")public class TestAsyncController {    @Autowired    private AsyncTestService asyncTestService;    @GetMapping("/test05")    public void test05() {        asyncTestService.invokeAsyncTest05();    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async    @Override    public void invokeAsyncTest05() {        System.out.println(Thread.currentThread() + "运行了invokeAsyncTest05方法!");    }}
当返回值为非Futute类型示例
@RestController@RequestMapping("/testasync")public class TestAsyncController {    @Autowired    private AsyncTestService asyncTestService;    @GetMapping("/test07")    public void test07() {//永远为null 如果要异步结果 请用Future封装返回结果        List<String> result = asyncTestService.invokeAsyncTest07();        System.out.println(result);    }}
@Servicepublic class AsyncTestServiceImpl implements AsyncTestService {    @Async    @Override    public List<String> invokeAsyncTest07() {        System.out.println(Thread.currentThread() + "invokeAsyncTest07!");        List<String> result = Arrays.asList("Hello World1", "Hello World2");        return result;    }    }

思考

来源地址:https://blog.csdn.net/yubaojin/article/details/116355735

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯