1 前言
在项目开发中,异步化处理是非常常见的解决问题的手段,异步化处理除了使用线程池之外,还可以使用 CompletableFuture
来实现,在多任务处理且之间存在逻辑关系的情况下,就能够体现出其巨大的优势和灵活性。CompletableFuture
底层使用的是 ForkJoinPool
线程池来实现线程的执行和调度。
2 简单使用
在使用线程池时,通常的使用方法如下所示:
ExecutorService service = Executors.newFixedThreadPool(3);
Callable<String> task1 = () ->{return "task1";};
Runnable task2 = () ->{
System.out.println("task2 ");
};
// 用于提交任务根据是否获取返回值分为 Callable 和 Runnable,分别使用 submit 和 execute 方法
service.submit(task1);
service.execute(task2);
但是在 CompletableFuture
中,使用方法还是有所区别的,是线程池和任务的结合,能够使用链式编程来处理任务之间的逻辑关系。
具体的使用如下所示:
// 使用默认线程池
CompletableFuture<String> async1 = CompletableFuture.supplyAsync(() -> {
log.info("async1 ... ");
return "async1";
});
// 使用自定义线程池
CompletableFuture<String> async1 = CompletableFuture.supplyAsync(() -> {
log.info("async1 ... ");
return "async1";
}, Executors.newSingleThreadExecutor());
// runAsync 的使用方式
CompletableFuture<Void> future = CompletableFuture.runAsync(()-> {
System.out.println("runAsync");
});
异步任务的开启一般有两个方法,supplyAsync
和 runAsync
,这两个方法的别别在于:
- 1 supplyAsync 不接受入参,但是会有返回结果。
- 2 runAsync 也是不接受入参,但是没有返回结果。
这里需要先说明一下,xxxAsync 的方法都是从使用线程池中获取一个线程来处理任务,不带 Async 结尾的方法则是使用上一任务的线程继续处理。
3 异步处理
在正式开始之前,需要讲解一下 java8 函数式编程的函数,相信大家看到这么多的函数都会头晕的,但是其中也是有规律可循的,先说三个主要的:
- 1 Function , 既然是函数,那么就会有一个入参和返回值,可以用于计算。
- 2 Comsumer , 是一个消费者,接收一个入参但是没有返回值,只用于消费。
- 3 Supplier, 是一个提供者,不接受参数,但是有一个返回值,可以用于对象的创建。
- 4 Predicate, 用来做判断使用,接收一个入参,返回值是 布尔类型的,true 或者 false。
简单的案例如下图所示:
有这基本的 4 个,就可以进行延伸了,比如 IntFunction 则是接收一个 int 类型的参数,处理完成后即可返回,前面的 Int 只是规定了入参的类型而已,再有 BiConsumer , 则是接收两个入参,Consumer 则是只能接收一个参数。依次类推就可以知道所有的函数式接口的功能,是不是很简单?
3.1 thenApply
thenApply 和 thenApplyAsync 都是接收一个 Function 参数,即接收一个参数并返回结果。区别在于前者是使用前一个任务的线程继续处理,后者是从线程池中在获取一个线程处理任务。
如上图所示,thenApply 的任务处理和 future 使用的是一个线程,但是 thenApplyAsync 就换了一个线程继续数据的处理。
3.2 thenAccept 和 thenRun
从方法名可以看到 thenAccept 和 thenRun 都是使用前一个人任务的线程进行处理的。两者都是在前一个任务完成后进行处理,区别点在于 thenAccept 接收的是一个 Consumer , 而 thenRun 接收的是一个 Runnable, 因此两者都没有返回值,但是前者可以接收并消费一个参数,但是 thenRun 不能接收参数。这两个方法的测试如下图所示:
既然这两个方法已经搞清楚了,那么 thenAcceptAsync 和 thenRunAsync 是不是就顺手学到了呢?异步编程的 API 真的是很简单。
3.3 exceptionally 异常处理
exceptionally 属于异常处理流程,如果发生异常则需要进行异常处理,需要将异常最为参数传递给 exceptionally, 而其需要的是一个 Function 参数,这里的异常处理也是同步进行的,也是采用上一个任务的线程进行处理。
// 抛出异常信息
CompletableFuture<String> exceptionally = future.exceptionally((ex) -> {
log.info("error information " + ex.getLocalizedMessage());
return ex.getMessage();
});
3.4 whenComplete 方法完成之后
这个方法是当某个任务执行完成之后进行回调,会将任务的执行结果或者执行期间的异常信息传递过来进行处理,在正常的情况下,异常信息为 null,能够得到任务的运算结果,异常情况下,异常信息不为空,返回结果为 null。这里的 whenComplete 接受的是一个 BiConsumer 函数,也就是两个入参,没有返回结果,一个是方法的返回结果,一个则是任务处理过程中的异常信息。
// 返回结果
CompletableFuture<String> whenComplete = future.whenComplete((res, ex) -> {
if (StrUtil.isNotBlank(res)) {
log.info("task execute result {}", res);
}
if (res != null) {
log.info("task error info {}", ex.getMessage());
}
});
知道了 whenComplete 方法,那么 whenCompleteAsync 方法的使用就知道了,就是异步处理了。
3.5 handle
handle 的使用和 whenComplete 方法类似,都是获取任务的结果,只不过 handle 有返回结果,接受的参数是一个 BiFunction ,那么具体的使用方法如下图所示:
// handle 处理返回结果
CompletableFuture<String> handle = future.handle((res, ex) -> {
if (StrUtil.isNotBlank(res)) {
log.info("task execute result {}", res);
return "handle result exception";
}
if (res != null) {
log.info("task error info {}", ex.getMessage());
}
return "handle result";
});
通过以上的分析,我们已经到得了以下规律:任何一个方法的实现都有三个类似的 API,一个是同步处理,一个是异步处理,一个是异步处理并指定线程池参数。目前已经介绍了 6 个 API,分别是 thenApply
, thenAccept
,thenRun
, whenComplete
, handle
和 一个异常处理 exceptionally
, 前五个举一反三就知道了其他的两个异步调用 API,掌握了其中的规律就不会觉得很多,无非就是同步异步,是否接收参数和有无返回值的区别。
4 处理组合
4.1 任务均完成后组合
thenCombine
、thenAcceptBoth
、runAfterBoth
这三个方法都是在两个 CompletableFuture
任务结束后在进行执行,区别在于是否接受参数以及是否有返回值,如图所示查看其接受的参数。
- 1
thenCombine
方法为两个,第一个为CompletionStage
对象即另一个异步任务,第二个为BiFunction
,接收两个任务的处理结果并返回处理结果。 - 2
thenAcceptBoth
方法为两个,第一个为CompletionStage
对象即另一个异步任务,第二个为BiConsumer
, 接收两个任务的处理结果不过没有返回值。 - 3
runAfterBoth
方法为两个,第一个为CompletionStage
对象即另一个异步任务,第二个为Runnable
,不接收两个任务的处理结果,也没有返回值。
下图是方法的使用案例:
既然知道了这些方法的用法,那么 thenCombineAsync
、thenAcceptBothAsync
、runAfterBothAsync
是不是就可以同理掌握了呢?
4.2 任一任务完成
前文提到的都是两个任务均完成的情况,接下来的三个方法则是任何一个任务完成即可执行下一个动作,applyToEither
、acceptEither
、runAfterEither
这三个方法都是在两个异步任务执行结果之后的处理,任何一个任务执行完毕之后就进行继续处理。
这里的任一任务执行完成和两者任务都执行完在执行是类似的,区别在于这里接收的是一个参数:
- 1 applyToEither 接收的参数是 CompletionStage 和 Function。
- 2 acceptEither 接收的参数是 CompletionStage 和 Consumer。
- 3 runAfterEither 接收的参数是 CompletionStage 和 Runnable。
这里已经学习到了 applyToEither
、acceptEither
、runAfterEither
三个方法,那么类似的 applyToEitherAsync
、acceptEitherAsync
、runAfterEitherAsync
也可以知道其具体用法。
4.3 任务处理结果
thenCompose
的用法和 thenCombine
等的用法基本都是一样的,只不过在返回参数上有所区别,结果是返回一个 Future, 入参是一个 Function 。在了解了 thenCompose
之后,那么 thenComposeAsync
的使用方法就是类似了。
CompletableFuture<String> thenCompose = future.thenCompose((res) -> {
log.info("result is {}", res);
return CompletableFuture.supplyAsync(() -> {
log.info("supplyAsync");
return "result";
});
});
4.4 所有或者任何
前面已经分享过了两个任务和一个任务的处理之后的操作,在本节中将分享 allOf
和 anyOf
,这是多个任务的聚合处理,入参都是多个 CompletableFuture
, 区别在于是任何一个任务完成后就执行后续任务,还是所有的任务都完成后再继续任务处理。
其使用方法如下所示:
CompletableFuture<Void> allOf = CompletableFuture.allOf(future);
CompletableFuture<Object> andOf = CompletableFuture.anyOf(future);
5 总结
文中,我们首先介绍了函数式编程的接口使用方法,然后分享了 CompletableFuture
的 API 使用方法。核心就是函数式编程接口,接收的是 Function
、Consumer
还是 Runable
, 其次就是否是 xxxAsync
异步处理。
到此这篇关于深入学习java8 中的CompletableFuture的文章就介绍到这了,更多相关java8 CompletableFuture 内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!