JAVA项目中的异步任务
使用异步任务
使用异步任务(Async Task)的主要目的是为了提高程序的响应速度和性能。异步任务允许程序在执行某些耗时操作时,不会阻塞主线程,从而使得主线程能够更快地响应用户的请求,提高用户体验。
实际开发中业务存在不同的任务场景,负责参与业务的任务可能并不是必须顺序执行,通过将一些耗时的操作放在一个单独的线程中执行,主线程就可以继续执行其他任务,可以避免了阻塞主线程,从而减少主线程返回结果的耗时。
通常使用异步任务的场景
远程调用
在进行网络请求时,通常需要等待服务器返回数据,这个过程可能会比较耗时。如果将网络请求放在主线程中执行,会导致主线程被阻塞,用户界面就会卡顿。因此,通常使用异步任务来处理网络请求,以保证主线程的流畅性。
I/O操作
访问文件系统、访问数据库都是此类操作,如果在主线程中执行,也会导致主线程被阻塞,影响用户体验。如果一次业务逻辑中存在多个I/O操作,可以将这些操作放到线程池中多个操作并行执行。
资源处理
有些时候前端传递的较多的资源(图表、数据集合等)在系统中需要进行复杂的数据清理或者转换等动作。这里可以将一批资源放在多个线程中并行执行,提供程序瞬时响应。
使用异步任务的问题
异步任务虽然能够提供程序的响应速度,但真正决定使用异步任务的时候还是要注意如何正确使用,否则异步任务会对系统带来隐患。
资源的修改
多个异步任务中同时对同一个共享资源进行修改,可能会导致数据不一致的问题。因此,需要使用同步机制来保证线程安全。
内存泄漏
在异步任务中创建了一个线程,但是没有正确地释放线程资源,在任务执行结束后,线程资源没有被回收,就可能导致内存泄漏。
异常处理
在异步任务中出现了未处理的异常,会导致程序崩溃。因此,需要合理地处理异步任务中的异常。
合理配置线程池
如果线程池的配置不合理,可能会导致线程池中的线程数量过多或过少,从而影响程序的性能。因此,需要合理地配置线程池的大小。
多线程结果使用
在异步任务执行完成后,通常需要将执行结果传递给主线程进行处理。使用回调方式需要注意线程安全和内存泄漏等问题。并且在尝试获取结果的时候注意超时问题。
死锁问题
在多线程中如果存在对多个公用资源进行加锁的情况,一定要注意加锁顺序,避免出现死锁。
异步任务效率
异步任务是为了主线程能够更快地响应用户的请求,业务最终需要执行的任务并没有减少,程序最终消耗的系统资源依旧是这么多,并且因为多线程存在线程上下文切换和通信的开销实际上异步任务会提高业务需要资源的总量。
所以在使用异步任务的时候,需要注意某些导致服务器满负载的业务中,使用多线程并不能提高系统的响应速度。但是服务器压力不大,只是想提供单次请求的响应速度,使用异步任务的确是合理的选择
项目中常用的异步任务
CompletableFuture
CompletableFuture是Java 8中新增的异步任务API,提供了一种方便的方法来处理异步任务。通过CompletableFuture可以实现异步任务的组合和链式调用,以及设置异步任务的执行结果。
Spring Async
Spring框架中用于异步任务处理的API,通过Spring Async可以在后台线程中执行异步任务。它可以方便地与Spring框架的其他组件集成,如AOP、事务管理等。
直接使用线程池创建异步任务
通过线程池,可以充分利用系统资源,避免线程的等待和切换带来的开销,提高系统的效率。
CompletableFuture
CompletableFuture 是 Java 8 引入的一种异步编程机制,它提供了一种简单而强大的方法来处理异步操作的结果。CompletableFuture 可以用于执行异步计算,等待计算完成,处理计算结果以及处理异常情况。
创建异步任务
如果想创建异步任务CompletableFuture
提供了supplyAsync
和runAsync
两个方法。
supplyAsync
supplyAsync()
方法接受一个Supplier
函数式接口作为参数,可以在其中定义异步操作,其可以设置返回内容。
public static CompletableFuture<String> buildSupplyAsync(String taskName) { return CompletableFuture.supplyAsync(() -> { SlowTask task = new SlowTask(taskName); return task.process(); }); }
runAsync
runAsync()
方法接受一个Runnable
对象作为参数,可以在其中定义异步操作,其不返回结果。
public static CompletableFuture buildRunAsync(String taskName) { return CompletableFuture.runAsync(() -> { SlowTask task = new SlowTask(taskName); task.process(); }); }
异步任务的操作
在创建完异步任务后,CompletableFuture
提供了方法可以对其进行后续操作
thenApply
将计算结果传递给下一个 CompletableFuture 对象
public static CompletableFuture buildThenApply(String taskName) { CompletableFuture<String> completableFuture = buildSupplyAsync(taskName); return completableFuture.thenApply(rest -> { System.out.println(rest); return rest + ":buildThenApply"; }); }
thenRun
方法接受一个 Runnable 对象作为参数。该方法会返回一个新的 CompletableFuture 对象,该对象的结果类型为 Void。
public static CompletableFuture buildThenRun(String taskName) { CompletableFuture<String> completableFuture = buildSupplyAsync(taskName); return completableFuture.thenRun(() -> { // 计算完成后执行 System.out.println("计算完成"); }); }
thenAccept
对计算结果进行处理,但不返回任何值。
public static void buildThenAccept(String taskName) { CompletableFuture<String> completableFuture = buildSupplyAsync(taskName); completableFuture.thenAccept(System.out::println); }
exceptionally
exceptionally 方法处理 CompletableFuture 的异常情况。该方法接受一个 Function 对象作为参数,用于处理异常情况并返回一个默认值或抛出另一个异常。
public static void buildExceptionally(String taskName) { CompletableFuture<String> completableFuture = buildSupplyAsync(taskName); completableFuture.thenApply(rest -> { // 异步执行计算,抛出异常 throw new RuntimeException("计算异常"); }); CompletableFuture<String> exceptionally = completableFuture.exceptionally(ex -> { System.out.println(ex.getMessage()); return ex.getMessage(); }); completableFuture.thenAccept(System.out::println); }
多任务执行
CompletableFuture
允许你将多个异步任务合并执行
anyOf
anyOf
在任何一个 CompletableFuture 对象完成后就会完成,并返回该 CompletableFuture 对象的计算结果。
public static void buildAnyOf() { CompletableFuture<String> future1 = buildSupplyAsync("任务1"); CompletableFuture<String> future2 = buildSupplyAsync("任务2"); CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2); }
allOf
allOf
在所有的 CompletableFuture 对象完成后就会完成,并不返回任何计算结果。
public static void buildAllOf() { try { CompletableFuture<String> future1 = buildSupplyAsync("任务1"); CompletableFuture<String> future2 = buildSupplyAsync("任务2"); CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2); result.thenRun(() -> System.out.println("所有计算都已完成")); Thread.sleep(2000L); } catch (Exception e) { e.printStackTrace(); } }
结果获取
只有使用了supplyAsync
,thenApply
,exceptionally
,anyOf
的方法才能获取返回值。
get
使用get
方法等待CompletableFuture
完成,该方法会抛出异常。
public static void buildGet(String taskName) { CompletableFuture<String> completableFuture = buildSupplyAsync(taskName); completableFuture.thenApply(rest -> { // 异步执行计算,抛出异常 throw new RuntimeException("计算异常"); }); String rest = null; try { rest = completableFuture.get(); // 避免因为主线程关闭而看不到任务结束 Thread.sleep(2000L); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println(rest); }
get
方法也提供了超时获取结果的方法
public static void buildTimeout(String taskName) { CompletableFuture<String> completableFuture = buildSupplyAsync(taskName); try { String rest = completableFuture.get(1, TimeUnit.SECONDS); System.out.println("所有计算都已完成" + rest); Thread.sleep(2000L); } catch (Exception e) { e.printStackTrace(); } }
join
使用join
方法等待 CompletableFuture
完成,该方法不会抛出异常。
public static void buildJoin(String taskName) { try { CompletableFuture<String> completableFuture = buildSupplyAsync(taskName); completableFuture.thenApply(rest -> { // 异步执行计算,抛出异常 throw new RuntimeException("计算异常"); }); String rest = completableFuture.join(); System.out.println(rest); // 避免因为主线程关闭而看不到任务结束 Thread.sleep(2000L); } catch (Exception e) { e.printStackTrace(); } }
上面例子中的SlowTask
public class SlowTask { private String taskName; public SlowTask(String taskName) { this.taskName = taskName; } public String process() { System.out.println("任务"+ taskName +"开始:" + System.currentTimeMillis()); try { Random random = new Random(); Thread.sleep(random.nextInt(1000) + 1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务"+ taskName +"结束:" + System.currentTimeMillis()); return taskName + "success"; }}
为什么要使用CompletableFuture
对于一些简单的异步任务,相比使用线程池、使用@Async注解这种方式显然实现起来更加简单。
方便任务拆分方面,使用then*
的相关方法我们可以将异步任务中的执行序列串联起来,而后续根据需要可以轻松的调整串联的内容。使用allOf
我们也可以将多个异步任务并联起来。
如果任务较多,且在局部范围内异步,而多个异步直接又存在同步关系,使用CompletableFuture
将这些逻辑拆分到不同的内容中,使代码更加简洁和易读。
CompletableFuture 的使用注意
所有使用异步任务的操作都需要注意死锁问题。在使用 CompletableFuture 时,可能会存在死锁问题,需要注意线程安全和并发控制。
需要了解CompletableFuture
的使用用法,盲目的使用其方法有可能给你的代码带来致命缺陷。
虽然任务是异步的。但是 join
或get
这两个方法都会阻塞当前线程。
如果不能保证任务在规定时间内结束,使用get(long timeout, TimeUnit unit)
方式获取结果。
Spring Async
@Async
注解是Spring提供的异步任务执行的方式
使用Async注解
使用Async注解可以将一个方法标记为异步任务。此时该注解的方法会在调用时立即返回,而实际执行则交由线程池中的线程来处理。描述起来非常简单,但是实际操作中需要注意其使用还是有很大限制
参数
如果希望获取异步任务的结果,其返回值必须是Future
,就像下面这样。
@Asyncpublic CompletableFuture<String> doSomethingAsync() { // 异步执行的任务 return CompletableFuture.completedFuture("Task completed!");}
主动开启缓存
仅仅配置注解是不行的,还需要在Spring配置类中添加@EnableAsync
注解开启异步支持
@Configuration@EnableAsyncpublic class AppConfig { // 配置类的内容}
代理实现的异步
@Async
是通过Spring的代理机制实现的。所以使用Async方法时会面临下面问题。
- 方法的访问权限必须是
public
。 - 方法不能被
final
修饰。 - 异步方法调用同一个类中其他的异步方法,其实调用的是其原始方法,所以无法实现异步。
- 因为在异步方法中,事务上下文已经失效,所以事务将无法生效。
使用Async还是线程池
使用线程池还是Async都能提高系统的并发性能。但是相比线程池,使用注解方式实现异步任务会有下面的优势。
代码关注度更加集中
使用@Async
注解可以让开发人员专注于业务逻辑的实现,而无需关注线程的创建和管理,从而简化了代码实现。
更容易实现异步任务
对于开发人员其需要将任务设置为异步任务的时候只需要添加注解即可完成需求。
更好的可读性和可维护性
只需要查看注解就能理解业务中哪些内容是异步的行为,并且后续将任务从异步和同步进行转换时,只需要修改方法注解即可。
使用 Async的问题
使用门槛
虽然使用注解即可切换方法的同步和异步行为,但是如果不了解Async的使用限制,很可能会导致异步任务创建失败,或者其任务执行结果和实际结果背道而驰的情况。
串联多个异步任务复杂
因为使用代理模式,当异步方法需要和同一个类的其他异步方法一起执行时,我们需要使用调整代码来保证所有的方法都是异步进行的,此时会增加代码的复杂性。为了实现逻辑我们不得不使用一些比较绕圈子的方式。这可能会降低代码的可读性和可维护性。
线程安全
显示的创建线程池或者使用线程池,让开发同学能够警惕伴随多线程可能产生的问题。而使用注解创建异步任务,会导致很多同学在轻松实现异步任务的时候忽略掉实现异步方法时需要考虑线程安全问题。并且因为其表面上看起来是一个方法调用,此时有些线程的变量会因为在不同上下文中导致数据的丢失,此时增加了排查问题的难度。
Async的线程池
如果不去主动设置@Async
使用的是SimpleAsyncTaskExecutor
,该线程池设置为。
- 核心线程数:1
- 最大线程数:无限制
- 线程池队列容量:无限制
如果有大量的异步任务需要执行,将会导致线程创建过多,从而消耗大量的系统资源,所以实际中推荐还是来自定义线程池。
自定义线程池
如果需要自定义线程池需要在@Configuration
逐渐的配置文件中实现AsyncConfigurer
接口。实现其getAsyncExecutor
方法。
@Configuration@EnableAsyncpublic class AppConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(10); executor.setThreadNamePrefix("MyExecutor-"); executor.initialize(); return executor; } // 其他配置内容}
推荐实际中使用自定义线程池的方式使用@Async
。这样通过命名线程池可以在日志中定位@Async
任务的执行情况,以及将@Async
任务的执行线程纳入到线程监控和管理中。
直接使用线程池
线程池可以帮助您在应用程序中高效地管理和执行异步任务。它可以减少线程创建和销毁的开销,并允许您重用线程,从而提高应用程序的性能。
使用线程池执行异步任务
在创建线程池后我们可以将任务传递到线程池中执行,线程池支持下面类型的任务。
Runnable
接口创建的异步任务Callable
接口创建的异步任务CompletableFuture
类创建的异步任务FutureTask
类创建的异步任务
下面是这些任务对应的写法
Runnable
public void buildRunTask() { Executor executor = Executors.newCachedThreadPool(); executor.execute(new Runnable() { @Override public void run() { // 异步任务代码 } }); }
Callable
public void buildCallableTask() { ExecutorService executor = Executors.newCachedThreadPool(); Future<String> future = executor.submit(new Callable<String>() { @Override public String call() throws Exception { // 异步任务代码 return "异步任务执行结果"; } }); }
CompletableFuture
public void buildCompletableFutureTask() { Executor executor = Executors.newCachedThreadPool(); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 异步任务代码 return "异步任务执行结果"; }, executor); }
FutureTask
public void buildFutureTaskTask() { ExecutorService executor = Executors.newCachedThreadPool(); FutureTask<String> future = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { // 异步任务代码 return "异步任务执行结果"; } }); executor.execute(future); }
每种方式的区别
Runnable
Runnable是最简单的方式,但是其无法获取到执行结果,也无法中止任务
Callable
使用Callable
我们可以获取到执行结果,并且可以中止任务,但是获取结果时需要考虑超时问题,以及任务的需要需要进行一系列业务上的判断,实际使用中要关注的状态会较多。
CompletableFuture
支持链式调用,可以串联多个异步任务。并且可以异常处理,可以方便地处理异步任务中的异常。其主要用来处理业务中存在关联的异步任务,对于简单的异步任务使用CompletableFuture
略微复杂。
FutureTask
其和Callable
是一样的逻辑。使用这种方式主要是在有些场景下,通过在其他位置创建 Callable
实例,然后在另外一处进行执行的场景。
具体使用哪种方式
上面几种异步任务并没有绝对的优劣,使用哪种异步任务主要还是取决于实际的业务场景。具体选择可以考虑下面几个方面。
是否关注异步任务结果
如果我们需要获取异步任务的执行结果,我们需要使用 Callable 接口、CompletableFuture 类或 FutureTask 类来创建异步任务。而如果我们只需要异步执行一些操作,而不需要获取执行结果,则可以使用 Runnable 接口来创建异步任务。
是否需要支持取消任务
我们需要使用 Future 接口的 cancel 方法来取消异步任务的执行。Callable 接口、CompletableFuture 类和 FutureTask 类都实现了 Future 接口,因此都支持任务的取消。
是否存在多个异步任务的关联操作
CompletableFuture可以通过链式调用,串联多个异步任务。
是否需要对任务的异常进行操作
在使用 Callable 接口、 FutureTask 类创建异步任务时,我们可以使用 try-catch 语句对异常进行处理。而CompletableFuture 类提供exceptionally()方法进行异常处理。
如何创建线程池
线程池的创建,Executors类提供了默认实现,这使得我们可以很方便的创建出线程池。
Executors
方法 | 作用 |
---|---|
Executors.newFixedThreadPool(int nThreads) | 该方法创建一个固定大小的线程池,该线程池中的线程数量固定为 nThreads |
Executors.newSingleThreadExecutor() | 该方法创建一个只包含一个线程的线程池 |
Executors.newCachedThreadPool() | 该方法创建一个可缓存的线程池,线程池的线程数量不固定,根据任务数量动态调整 |
实际上Executors提供了更多可选的内容,但是实际上并不推荐使用。并不是Executors存在什么问题。而是对于开发人员使用线程池一定要小心谨慎,很多时候多线程会出现各种奇怪的故障,而且其排查起来也非常困难。
使用Executors创建线程池的时候,开发者无法感知到实际线程池设置了哪些参数,对于会发生什么结果也并不清楚。所以为了避免出现问题,推荐直接使用ThreadPoolExecutor创建线程池。
ThreadPoolExecutor
下面是一个创建ThreadPoolExecutor线程池的参数列表
public ThreadPoolExecutor(int corePoolSize,// 核心线程数 int maximumPoolSize,// 最大线程数 long keepAliveTime,// 线程空闲时间 TimeUnit unit,// 空闲时间单位 BlockingQueue<Runnable> workQueue,// 队列容量 ThreadFactory threadFactory,// 线程工厂 RejectedExecutionHandler handler) {// 拒绝策略 ......}
线程数
线程池大小应该根据任务量和系统资源进行调整,过大的线程数会浪费系统资源,而过小的线程数又会导致任务响应不及时。
一个默认的说法是线程池大小设置为处理器数量的 1.5 倍左右。但是这个值并不是绝对的,实际中要根据考虑下面情况:
- 任务类型
- 业务需求
任务类型
如果任务是 I/O 密集型的任务时,任务的大量时间可能是在等待 I/O 操作,此时线程并不会使用CPU 资源。这个时候逐步提高线程池线程数则可以提高系统的并发。
但如果任务需要大量的计算和运算,那就将线程池按照1.5 倍左右设置。并且在实际运行中时刻监控CPU的使用情况。
业务需求
如果系统比较在意瞬时的快速响应用户请求,可以将线程池大小设置为较大的值,以提高系统的响应速度。
但是如果系统并不在意瞬时的响应,而是在意处理任务的效率。将线程池大小设置为较小的值,以避免过多的系统资源消耗和上下文切换。
队列大小
队列大小影响的是任务等待时间和任务成功率。如果队列设置过小可能会导致任务被拒绝,队列容量设置过大,可能会导致任务等待时间过长。
判断设置多大的队列实际中要考虑下面情况
- 硬件情况
- 业务需要
硬件情况
维持队列也是在消耗系统资源,很多时候拒绝任务会导致一系列的任务操作被回滚,对于有些用户可以忍耐慢却无法忍受异常。我们为了避免任务被拒绝,希望队列尽可能的大。这个时候需要考虑系统的内存,如果系统内存比较充足,可以将队列容量设置为较大的值,以提高系统的并发度。而如果系统内存较小,可以将队列容量设置为较小的值,以避免过多的系统资源消耗。
业务需要
如果用户需要快速的获得任务结果,并且他们并不是很在意结果的成功和失败(忍受系统繁忙而拒绝任务)。此时可以将队列容量设置为较小的值,以避免任务等待时间过长。
如果我们需要尽可能的处理接收到的任务,并且不是很在意响应时间,可以将队列容量设置为较大的值,以提高系统的并发度。
拒绝策略
拒绝策略是在线程池中的任务队列已满且线程池中的线程数量已经达到最大值时,用来处理新任务的策略,拒绝策略主要有下面几种。
策略 | 说明 |
---|---|
ThreadPoolExecutor.AbortPolicy | 抛出 RejectedExecutionException 异常,表示拒绝执行新任务。 |
ThreadPoolExecutor.CallerRunsPolicy | 在调用 execute 方法的线程中执行新任务 |
hreadPoolExecutor.DiscardPolicy | 直接丢弃新任务,不抛出异常 |
ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列中最老的任务,然后重新提交新任务。 |
实际中使用哪种策略取决于业务要求。
如果希望所有的任务都要得到执行,使用ThreadPoolExecutor.CallerRunsPolicy
策略可以完成需要的结果。
如果希望将线程池中的结果尽快通知用户,可以选择 ThreadPoolExecutor.AbortPolicy
策略,及时抛出服务繁忙的异常。
如果我们的任务存在时效性,越新的任务执行价值越大的时候,为了保证提交的任务需要尽快得到执行,而队列中的旧任务已经没有太大的价值。这个时候使用 ThreadPoolExecutor.DiscardOldestPolicy
策略是最合适的。
而另外一种情况,当队列中的任务对应用程序的影响不大,其执行与否并不是很重要的事情时,使用ThreadPoolExecutor.DiscardPolicy
策略,这样可以确保系统只去处理当前资源能够处理的任务量。
实际中后面两种方式都会丢弃任务且不提供异常,这种方式是否应用在生产环境中需要结合业务谨慎考虑。
线程工厂
线程工厂是用来创建线程的工具类,它将线程的创建和管理与应用程序的逻辑分离开来,使得应用程序更加灵活和可扩展。
下面是官方线程工厂的内容
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
可以看到其主要做了下面的工作。
- 线程名称
- 线程优先级
- 线程守护状态
使用自定义的线程工厂,我们可以根据业务自行设置线程名称,便于调试和日志输出。
而对于执行异步任务的线程,我们可以通过设置线程优先级、守护状态和异常处理器等,根据业务对其逻辑进行调整。
我们可以将线程池管理器和线程工厂结合在一起,实现对线程池的管理和监控。
关闭线程池
线程池默认不会自动关闭(除非核心线程设置为0),需要手动关闭线程池。实际使用中,对于一些临时的线程池,一定要主动将其关闭,否则其会在系统中持续消耗资源。线程池的关闭提供了下面的方法。
shutdown()
方法。该方法会停止接受新的任务,并开始关闭线程池。此时线程池会继续执行已经提交的任务,但不会再接受新的任务。
awaitTermination()
方法。该方法会等待线程池中所有任务执行完毕或超时,返回 true
表示所有任务执行完毕,返回 false
表示等待超时。
shutdownNow()
方法,强制关闭线程池。该方法会尝试停止所有正在执行的任务,并返回未执行的任务列表。
使用线程池要注意的内容
- 设置合适的线程数参数和线程存活设置
- 使用合适的拒绝策略
- 推荐使用自定义的线程工厂
- 做好异常处理
- 对线程池进行管理和监控
- 如果是临时线程池需要及时关闭线程池
来源地址:https://blog.csdn.net/qq330983778/article/details/130039646