异步执行对于开发者来说并不陌生,在实际的开发过程中,很多场景多会使用到异步,相比同步执行,异步可以大大缩短请求链路耗时时间,比如:发送短信、邮件。
异步的八种实现方式:
- 线程异步
Thread/Runnable
Future
+Callable
- 异步框架
CompletableFuture
- Spring 注解
@Async
Spring ApplicationEvent
事件- 第三方异步框架,比如 Hutool 的
ThreadUtil
Guava
异步- 消息队列
1、线程异步
public class ThreadTest implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()); } public static void main(String[] args) { ThreadTest threadTest = new ThreadTest(); new Thread(threadTest).start(); }}
当然,如果每次都创建一个 Thread 线程,频繁的创建、销毁,浪费系统资源,我们可以采用线程池:【Thread】线程池的 7 种创建方式及自定义线程池
2、Future 异步
public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1); Future<String> future = executor.submit(() -> { Thread.sleep(2000); return "this is future execute final result!!!"; }); //这里需要返回值时会阻塞主线程 String result = future.get(); System.out.println(result); executor.shutdown(); }}
Future的不足之处的包括以下几点:
- 无法被动接收异步任务的计算结果:虽然我们可以主动将异步任务提交给线程池中的线程来执行,但是待异步任务执行结束之后,主线程无法得到任务完成与否的通知,它需要通过get方法主动获取任务执行的结果。
- Future件彼此孤立:有时某一个耗时很长的异步任务执行结束之后,你想利用它返回的结果再做进一步的运算,该运算也会是一个异步任务,两者之间的关系需要程序开发人员手动进行绑定赋予,Future并不能将其形成一个任务流(pipeline),每一个Future都是彼此之间都是孤立的,所以才有了后面的CompletableFuture,CompletableFuture就可以将多个Future串联起来形成任务流。
- Futrue没有很好的错误处理机制:截止目前,如果某个异步任务在执行发的过程中发生了异常,调用者无法被动感知,必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误,从而在做进一步的判断处理
3、CompletableFuture
关于 CompletableFuture
更多详情请看:【异步】Futurn、FutureTask、CompletionService、CompletableFuture
public static void thenRunAsync() throws ExecutionException, InterruptedException { CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread() + " cf1 do something...."); return 1; }); CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> { System.out.println(Thread.currentThread() + " cf2 do something..."); }); //等待任务1执行完成 System.out.println("cf1结果->" + cf1.get()); //等待任务2执行完成 System.out.println("cf2结果->" + cf2.get());}
4、Spring 注解 @Async
@Configurationpublic class ThreadPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { //返回可用处理器的Java虚拟机的数量 12 int i = Runtime.getRuntime().availableProcessors(); System.out.println("系统最大线程数 : " + i); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心线程池大小 executor.setCorePoolSize(16); //最大线程数 executor.setMaxPoolSize(20); //配置队列容量,默认值为Integer.MAX_VALUE executor.setQueueCapacity(99999); //活跃时间 executor.setKeepAliveSeconds(60); //线程名字前缀 executor.setThreadNamePrefix("asyncServiceExecutor -"); //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行 executor.setAwaitTerminationSeconds(60); //等待所有的任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); return executor; }}
@Service@EnableAsyncpublic class AsyncServiceImpl implements AsyncService { @Override @Async("taskExecutor") public String sendSms() { System.out.println(Thread.currentThread().getName()); return null; } @Override @Async("taskExecutor") public String sendEmail() { System.out.println(Thread.currentThread().getName()); return null; }}
在实际项目中, 使用 @Async
调用线程池,推荐等方式是是使用自定义线程池的模式,不推荐直接使用 @Async
直接实现异步
5、Spring ApplicationEvent
事件
Spring 中使用事件只需要以下的几个步骤:
- 定义事件,继承
ApplicationEvent
- 定义监听,要么实现
ApplicationListener
接口,要么在方法上添加@EventListener
注解 - 定义发布事件接口,调用
ApplicationContext.publishEvent()
或者ApplicationEventPublisher.publishEvent();
- 业务调用发布事件
@Getter@Setterpublic class BaseEvent<T> extends ApplicationEvent { private T data; public BaseEvent(Object source) { super(source); } public BaseEvent(Object source, T data) { super(source); this.data = data; }}
@Componentpublic class BaseEventListener implements ApplicationListener<BaseEvent<UserVo>> { @Override @Async("taskExecutor") public void onApplicationEvent(BaseEvent<UserVo> baseEvent) { UserVo eventData = baseEvent.getData(); // TODO 业务处理 }}
@Autowiredprivate ApplicationContext applicationContext;@GetMapping("/pubEvent")public void pubEvent() { BaseEvent<UserVo> baseEvent = new BaseEvent<>("event", new UserVo()); applicationContext.publishEvent(baseEvent);}
6、Hutool 的 ThreadUtil
public static void main(String[] args) { for (int i = 0; i < 3; i++) { ThreadUtil.execAsync(() -> { ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current(); int number = threadLocalRandom.nextInt(20) + 1; System.out.println(number); }); log.info("当前第:" + i + "个线程"); } log.info("task finish!");}
7、 Guava
异步
public static void test() { ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); final ListenableFuture<Integer> listenableFuture = executorService.submit(() -> { log.info("callable execute..."); TimeUnit.SECONDS.sleep(1); return 1; }); Futures.addCallback(listenableFuture, new FutureCallback<Integer>() { @Override public void onSuccess(@Nullable Integer integer) { System.out.println("Get listenable future's result with callback " + integer); } @Override public void onFailure(Throwable throwable) { throwable.printStackTrace(); } }, Executors.newCachedThreadPool());}
8、 消息队列
常用的消息队列:RabbitMq
、RocketMq
来源地址:https://blog.csdn.net/sco5282/article/details/131112037