文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Spring Boot之@Async异步线程池示例详解

2024-04-02 19:55

关注

前言

很多业务场景需要使用异步去完成,比如:发送短信通知。要完成异步操作一般有两种:

我们来看看Spring框架中如何去使用线程池来完成异步操作,以及分析背后的原理。

一. Spring异步线程池的接口类 :TaskExecutor

在Spring4中,Spring中引入了一个新的注解@Async,这个注解让我们在使用Spring完成异步操作变得非常方便。

Spring异步线程池的接口类,其实质是java.util.concurrent.Executor

Spring 已经实现的异常线程池:

1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。

2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方

3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类

4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类

5. ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装,

关于java-多线程和线程池:https://www.jb51.net/article/222986.htm

我们查看ThreadPoolExecutor初始化的源码就知道使用ThreadPoolExecutor。 

二、简单使用说明

Spring中用@Async注解标记的方法,称为异步方法。在spring boot应用中使用@Async很简单:

1、调用异步方法类上或者启动类加上注解@EnableAsync

2、在需要被异步调用的方法外加上@Async

3、所使用的@Async注解方法的类对象应该是Spring容器管理的bean对象;

 启动类加上注解@EnableAsync:


 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
 
@SpringBootApplication
@EnableAsync
public class CollectorApplication {
 
	public static void main(String[] args) throws Exception {
		SpringApplication.run(CollectorApplication.class, args);
	}
}

在需要被异步调用的方法外加上@Async,同时类AsyncService加上注解@Service或者@Component,使其对象成为Spring容器管理的bean对象;


 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
 
@Service
@Transactional
public class AsyncService {
    @Async
    public void asyncMethod(String s) {
        System.out.println("receive:" + s);
    }
 
    public void test() {
        System.out.println("test");
        asyncMethod();//同一个类里面调用异步方法
    }
    @Async
    public void test2() {
        AsyncService asyncService  = context.getBean(AsyncService.class);
        asyncService.asyncMethod();//异步
    }
    
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        System.out.println("asyncInvokeReturnFuture, parementer="+ i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        }
        return future;
    }
}
 
//异步方法和普通的方法调用相同
asyncService.asyncMethod("123");
Future<String> future = asyncService.asyncInvokeReturnFuture(100);
System.out.println(future.get());

如果将一个类声明为异步类@Async,那么这个类对外暴露的方法全部成为异步方法。


@Async
@Service
public class AsyncClass {
    public AsyncClass() {
        System.out.println("----init AsyncClass----");
    }
    volatile int index = 0;
    public void foo() {
        System.out.println("asyncclass foo, index:" + index);
       
    }
    public void foo(int i) {
        this.index = i;
        System.out.println("asyncclass foo, index:" + i);
    }
    public void bar(int i) {
        this.index = i;
        System.out.println("asyncclass bar, index:" + i);
    }
}

这里需要注意的是:

1、同一个类里面调用异步方法不生效:原因默认类内的方法调用不会被aop拦截,即调用方和被调用方是在同一个类中,是无法产生切面的,该对象没有被Spring容器管理。即@Async方法不生效。

解决办法:如果要使同一个类中的方法之间调用也被拦截,需要使用spring容器中的实例对象,而不是使用默认的this,因为通过bean实例的调用才会被spring的aop拦截
 本例使用方法:AsyncService asyncService  = context.getBean(AsyncService.class);     然后使用这个引用调用本地的方法即可达到被拦截的目的
备注:这种方法只能拦截protected,default,public方法,private方法无法拦截。这个是spring aop的一个机制。

2、如果不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。并发大的时候会产生严重的性能问题。

3、异步方法返回类型只能有两种:void和java.util.concurrent.Future。

      1)当返回类型为void的时候,方法调用过程产生的异常不会抛到调用者层面,

            可以通过注AsyncUncaughtExceptionHandler来捕获此类异常

      2)当返回类型为Future的时候,方法调用过程产生的异常会抛到调用者层面

三、定义通用线程池

1、定义线程池

在Spring Boot主类中定义一个线程池,public Executor taskExecutor() 方法用于自定义自己的线程池,线程池前缀”taskExecutor-”。如果不定义,则使用系统默认的线程池。


@SpringBootApplication
public class Application {
 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
 
    @EnableAsync
    @Configuration
    class TaskPoolConfig {
 
        @Bean
        public Executor taskExecutor1() {
            ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
            pool.setCorePoolSize(5); //线程池活跃的线程数
            pool.setMaxPoolSize(10); //线程池最大活跃的线程数
            pool.setWaitForTasksToCompleteOnShutdown(true);
            pool.setThreadNamePrefix("defaultExecutor");
            return pool;
        }
 
        @Bean("taskExecutor")
        public Executor taskExecutor2() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(200);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("taskExecutor-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setAwaitTerminationSeconds(60);
            return executor;
        }
    }
}

上面我们通过ThreadPoolTaskExecutor创建了一个线程池,同时设置了如下参数:

也可以单独类来配置线程池:


import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
 
 

 
@Configuration
@EnableAsync
public class MyThreadPoolConfig {
 
    private static final int CORE_POOL_SIZE = 10;
 
    private static final int MAX_POOL_SIZE = 20;
 
    private static final int QUEUE_CAPACITY = 200;
 
    public static final String BEAN_EXECUTOR = "bean_executor";
 
    
    @Bean(BEAN_EXECUTOR)
    public Executor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        // 设置队列容量
        executor.setQueueCapacity(QUEUE_CAPACITY);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("SE-Pool#Task");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

同时注意需要在配置类上添加@EnableAsync,当然也可以在启动类上添加,表示开启spring的@@Async

2、异步方法使用线程池

只需要在@Async注解中指定线程池名即可


@Component
public class Task {
    //默认使用线程池
    @Async
    public void doTaskOne() throws Exception {
        System.out.println("开始做任务");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务耗时:" + (end - start) + "毫秒");
    }
   //根据Bean Name指定特定线程池
    @Async("taskExecutor")
    public void doTaskOne() throws Exception {
        System.out.println("开始做任务");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务耗时:" + (end - start) + "毫秒");
    }
}

3、通过xml配置定义线程池

Bean文件配置: spring_async.xml

1. 线程的前缀为xmlExecutor

2. 启动异步线程池配置


<!-- 等价于 @EnableAsync, executor指定线程池 -->
    <task:annotation-driven executor="xmlExecutor"/>
    <!-- id指定线程池产生线程名称的前缀 -->
    <task:executor
        id="xmlExecutor"
        pool-size="5-25"
        queue-capacity="100"
        keep-alive="120"
        rejection-policy="CALLER_RUNS"/>

启动类导入xml文件:


 
@SpringBootApplication
@ImportResource("classpath:/async/spring_async.xml")
public class AsyncApplicationWithXML {
    private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithXML.class);
 
    public static void main(String[] args) {
        log.info("Start AsyncApplication.. ");
        SpringApplication.run(AsyncApplicationWithXML.class, args);
    }
 
}

线程池参数说明

1. ‘id' : 线程的名称的前缀

2. ‘pool-size':线程池的大小。支持范围”min-max”和固定值(此时线程池core和max sizes相同)

3. ‘queue-capacity' :排队队列长度

4. ‘rejection-policy': 对拒绝的任务处理策略

5. ‘keep-alive' : 线程保活时间(单位秒)

四、异常处理

上面也提到:在调用方法时,可能出现方法中抛出异常的情况。在异步中主要有有两种异常处理方法:

1. 对于方法返回值是Futrue的异步方法:

      a) 、一种是在调用future的get时捕获异常;

      b)、 在异常方法中直接捕获异常

2. 对于返回值是void的异步方法:通过AsyncUncaughtExceptionHandler处理异常


@Component
public class AsyncException {
 
     
    @Async
    public void asyncInvokeWithException(String s) {
        log.info("asyncInvokeWithParameter, parementer={}", s);
        throw new IllegalArgumentException(s);
    }
 
 
    
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        System.out.println("asyncInvokeReturnFuture, parementer={}", i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
            throw new IllegalArgumentException("a");
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        } catch(IllegalArgumentException e){
            future = new AsyncResult<String>("error-IllegalArgumentException");
        }
        return future;
    }
 
}

实现AsyncConfigurer接口对异常线程池更加细粒度的控制

a) 创建线程自己的线程池

b) 对void方法抛出的异常处理的类AsyncUncaughtExceptionHandler


@Service
public class MyAsyncConfigurer implements AsyncConfigurer{
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(1);
        threadPool.setMaxPoolSize(1);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        threadPool.setThreadNamePrefix("MyAsync-");
        threadPool.initialize();
        return threadPool;
    }
 
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncExceptionHandler();
    }
 
    
    class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
 
        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
            System.out.println("Exception message - " + throwable.getMessage());
            System.out.println("Method name - " + method.getName());
            for (Object param : obj) {
                System.out.println("Parameter value - " + param);
            }
        }
 
    }
 
}

五、问题

上面也提到:如果不自定义异步方法的线程池默认使用SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。并发大的时候会产生严重的性能问题。

一般的错误OOM:OutOfMemoryError:unable to create new native thread,创建线程数量太多,占用内存过大.

解决办法:一般最好使用自定义线程池,做一些特殊策略, 比如自定义拒绝策略,如果队列满了,则拒绝处理该任务。

到此这篇关于Spring Boot之@Async异步线程池的文章就介绍到这了,更多相关Spring Boot @Async异步线程池内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯