文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

并发编程:CompletableFuture异步编程没有那么难

2024-12-01 19:02

关注

根大家好,我是七哥,今天给大家分享一个非常强大的工具类:CompletableFuture,如果你平时也会遇到用多线程优化业务逻辑的场景,那么今天这篇文章我建议你读完,相信一定会让你在重构相关代码时得心应手,写出让人称赞的好代码😄,不过使用CompletableFuture的前提是JDK需要1.8以上哦~

那我们下面进入今天的正文。

  1. 项目中串行调用的例子
  2. 并行调用的几种实现
  3. CompletableFuture解析
  4. JDK8流程编程结合
  5. 总结

前言

在Java开发的web项目中,我们经常会遇到接口响应耗时过长,或者定时任务处理过慢,那在Java中最常见的解决方法就是并行了,想必大家也都不陌生了。

今天的分享主要带大家从一个实际的串行场景出发,如何一步步优化,同时也会分享在Java中实现并行处理的多种方式,以及它们之间的区别和优缺点,通过对比总结更加深入的了解并且使用Java中并发编程的相关技术。

一个串行调用的例子

现在我们有一个查询carrier下所有Load的接口,它需要查询Loads信息、Instruction信息、Stops信息、Actions信息后然后组装数据。

private List<Load> getHydratedLoads(Optional<Pageable> pageable, String predicate, List<Object> params) {
// 1. 耗时3
List<Load> loads = executeQuery("查询Loads列表");
// 2. 耗时4
List<Instruction> instructions = executeQuery("查询instructions列表");
// 3. 耗时2
List<Stop> stops = executeQuery("查询stops列表");
// 4. 耗时3
List<Action> actions = executeQuery("查询actions列表");
Multimap<String, Instruction> instructionsByLoadId = index(instructions, i -> i.getLoad().getId());
Multimap<String, Stop> stopsByLoadId = index(stops, s -> s.getLoad().getId());
Multimap<String, Action> actionsByStopId = index(actions, a -> a.getStop().getId());
// 数据处理
handle(loads,instructions,stops,actions);
return loads;
}

这段代码会有什么问题?其实算是一段比较正常的代码,但是在某一个carrier下数据量比较大时,sql查询是相对较慢的,那有没有办法优化一下呢?

当前这个请求耗时总计就是12s。上面实现中查询Load、Instruction、Stop、Action等信息是串行的,那串行的系统要做性能优化很常见的就是利用多线程并行了。

这种相互之间没有影响的任务,利用并行处理后耗时就可以优化为4s。

并行调用实现的几种方式

因为项目中多线程都用线程池,所以Thread.join()这种方式就不演示了。

1、Future+Callable

Future接口在Java5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让调用线程能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。

因为我们都是需要获取任务的返回值的,所以大家肯定想到是用 Future+Callable来做。

ThreadPoolExecutor提供了3个submit方法支持我们需要获取任务执行结果的需求。

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future submit(Runnable task);

简单介绍下这三个submit方法:

  1. 提交 Runnable 任务submit(Runnable task),这个方法入参是Runnable接口,它只有一个run()方法没有返回值,所以它返回的 Future
    只能用来判断任务是否结束;
  2. 提交 Callable 任务 submit(Callabletask),它的入参是Callable接口,只有一个call()方法,是有返回值的,所以可以获取任务执行结果;
  3. 提交 Runnable任务及结果引用 submit(Runnable task, Tresult),这个方法返回的Future,调用get()方法的返回值就是传入的result对象,一般用法就是实现Runnable接口时,声明一个有参构造函数,将result传进去,result相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。

这三个方法的返回值都是Future接口,Future 提供了5个方法:

分别是取消任务的方法cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。

需要注意的是:这两个 get()方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

2、FutureTask实现并行调用

我们再介绍下FutureTask工具类,这是一个实实在在的工具类,有两个构造函数,和上面类似,一看就明白了。

FutureTask(Callable callable);
FutureTask(Runnable runnable, V result);
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}

这个类实现了Runnable 和 Future接口,可以理解就是将任务和结果结合起来了,变成一个可以有响应结果的任务进行提交,本质上FutureTask里面封装的还是一个Callable接口,它实现可以有返回值就是因为它的run方法里面调用了Callable的call()方法,将结果赋值给result,然后返回。

下面我们看下如何优化我们上面的查询接口,实现并行查询:

private List<Load> getHydratedLoadsUsingFutureTask() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
FutureTask<List<Load>> queryLoadFutureTask = new FutureTask<>(() -> executeQuery("sql1"));
executorService.submit(queryLoadFutureTask);
FutureTask<List<Instruction>> queryInstructionFutureTask = new FutureTask<>(() -> executeQuery("sql2"));
executorService.submit(queryInstructionFutureTask);
FutureTask<List<Stop>> queryStopFutureTask = new FutureTask<>(() -> executeQuery("sql3"));
executorService.submit(queryStopFutureTask);
FutureTask<List<Action>> queryActionFutureTask = new FutureTask<>(() -> executeQuery("sql4"));
executorService.submit(queryActionFutureTask);
// 获取结果
List<Load> loads = queryLoadFutureTask.get();
List<Instruction> instructions = queryInstructionFutureTask.get();
List<Stop> stops = queryStopFutureTask.get();
List<Action> actions = queryActionFutureTask.get();
// We got all the entities we need, so now let's fill in all of their references to each other.
handleData(loads, instructions, stops, actions);
return loads;
}

那你可能会想到,如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,该怎么处理呢?

这种问题基本上也都可以用 Future来解决,但是需要将对应的 FutureTask传入到当前任务中,然后调用get()方法即可。

比如,我们创建了两个 FutureTask——ft1和 ft2,ft1 需要等待 ft2 执行完毕后才能做最后的数据处理,所以 ft1 内部需要引用 ft2,并在执行数据处理前,调用 ft2 的 get()方法实现等待。

// 创建任务T2FutureTask
FutureTask<String> ft2
= new FutureTask<>(new T2Task());
// 创建任务T1FutureTask
FutureTask<String> ft1
= new FutureTask<>(new T1Task(ft2));
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());

// T1Task需要执行的任务:
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1任务需要T2任务的FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
// 获取T2线程结果
String tf = ft2.get();
return "处理完的数据结果";
}
}
// T2Task需要执行的任务:
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
return "检验&查询数据";
}
}

通过这上面的的例子,我们明显的发现 Future 实现异步编程时的一些不足之处:

我们很难表述Future结果之间的依赖性,从文字描述上这很简单。比如,下面文字描述的关系,如果用Future去实现时还是很复杂的。

比如:“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。

在JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture时传入回调对象,任务在完成或者异常时,自动回调,再也不需要每次主动通过Future 去询问结果了,我们接着往下看。

3、CompletableFuture

Java 在 1.8 版本提供了CompletableFuture 来支持异步编程,CompletableFuture 类实现了CompletionStage 和 Future

接口,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过 完成时回调 的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。

为了体会到CompletableFuture 异步编程的优势,我们还是先用 CompletableFuture 重新实现前面的程序。

public static List<Load> getHydratedLoadsUsingCompletableFuture()
throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
try {
// 任务1:查询loads列表
CompletableFuture<List<Load>> queryLoads = CompletableFuture.supplyAsync(() -> executeQuery("sql1"), executorService);
// 任务2:查询instructions列表
CompletableFuture<List<Instruction>> queryInstructions = CompletableFuture.supplyAsync(() -> executeQuery("sql2"),
executorService);
// 任务3:查询stops列表
CompletableFuture<List<Stop>> queryStops = CompletableFuture.supplyAsync(() -> executeQuery("sql3"), executorService);
// 任务4:查询actions列表
CompletableFuture<List<Action>> queryActions = CompletableFuture.supplyAsync(() -> executeQuery("sql4"),
executorService);
// 任务1234执行完成后执行数据组装
CompletableFuture<Void> combineFuture = CompletableFuture.allOf(queryLoads,
queryInstructions,
queryStops,
queryActions)
.thenRun(() -> handleData(queryLoads.join(), queryInstructions.join(), queryStops.join(), queryActions.join()));
System.out.println(Thread.currentThread().getName() + ": 主线程执行到这里了");
combineFuture.get();

System.out.println(String.format("""
queryLoads: %squeryInstructions: %squeryStops: %squeryActions: %s
""", queryLoads.isDone(), queryInstructions.isDone(), queryStops.isDone(), queryActions.isDone()));
return queryLoads.get();
} finally {
executorService.shutdown();
}
}

通过上面的代码我们可以发现 CompletableFuture有以下优势:

CompletableFuture 解析

1、CompletableFuture创建

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个方法区别在于:

ForkJoinPool是JDK7提供的,叫做分支/合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法,极大的提高效率,这个不属于今天我们讨论的点,感兴趣的话可以后面再聊。

注意⚠️:如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

问题:为什么supplyAsync方法接收一个 Supplier 函数式接口类型参数而不是一个 Callable 类型的参数呢?

@FunctionalInterface
public interface Callable<V> {

V call() throws Exception;
}
@FunctionalInterface
public interface Supplier<T> {


T get();
}

看了接口定义,我们发现它们其实都是一个不接受任何参数类型的函数式接口,在实践中它们做的是相同的事情(定义一个业务逻辑去处理然后有返回值),但在原则上它们的目的是做不同的事情:

2、 理解CompletionStage接口

另外 CompletableFuture 类还实现了 CompletionStage 接口,这个接口就比较关键了,之所以能实现响应式编程,都是通过这个接口提供的方法。

下面介绍下 CompletionStage 接口,看字面意思可以理解为“完成动作的一个阶段”,官方注释文档:CompletionStage 是一个可能执行异步计算的“阶段”,这个阶段会在另一个 CompletionStage 完成时调用去执行动作或者计算,一个 CompletionStage 会以正常完成或者中断的形式“完成”,并且它的“完成”会触发其他依赖的 CompletionStage 。CompletionStage 接口的方法一般都返回新的CompletionStage,因此构成了链式的调用。

这个看完还是有点懵逼的,不清楚什么是 CompletionStage?

在Java中什么是 CompletionStage ?

一个Function、Comsumer、Supplier 或者 Runnable 都会被描述为一个CompletionStage。

通过接口的继承关系,我们可以发现这里的异步操作到底什么时候结束、结果如何获取,都可以通过 Future接口来解决。

另外 CompletableFuture 类还实现了 CompletionStage 接口,这个接口就比较关键了,之所以能实现响应式编程,都是通过这个接口提供的方法。

下面介绍下 CompletionStage 接口,看字面意思可以理解为“完成动作的一个阶段”,官方注释文档:CompletionStage 是一个可能执行异步计算的“阶段”,这个阶段会在另一个 CompletionStage 完成时调用去执行动作或者计算,一个 CompletionStage 会以正常完成或者中断的形式“完成”,并且它的“完成”会触发其他依赖的 CompletionStage 。CompletionStage 接口的方法一般都返回新的CompletionStage,因此构成了链式的调用。

这个看完还是有点懵逼的,不清楚什么是 CompletionStage?

在Java中什么是 CompletionStage ?

一个Function、Comsumer、Supplier 或者 Runnable 都会被描述为一个CompletionStage。

stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())

但是 CompletionStage 这里面一共有40多个方法,我们该如何理解呢?

CompletionStage 接口可以清晰的描述任务之间的关系,可以分为 顺序串行、并行、汇聚关系以及异常处理。

串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。

public <U> CompletionStage<U> thenApply(Function super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function super T,? extends U> fn, Executor executor);
public CompletionStage<Void> thenAccept(Consumer super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer super T> action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
public <U> CompletionStage<U> thenCompose(Function super T, ? extends CompletionStage<U>> fn);
public <U> CompletionStage<U> thenComposeAsync(Function super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletionStage<U> thenComposeAsync(Function super T, ? extends CompletionStage<U>> fn, Executor executor) ;

thenApply() 和 thenCompose() 的区别?thenApply 转换的是泛型中的类型,是同一个CompletableFuture,thenCompose 用来连接两个CompletableFuture,是生成一个新的 CompletableFuture。他们都是让 CompletableFuture 可以对返回的结果进行后续操作,就像 Stream 一样进行 map 和 flatMap 的转换。

public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> result1 = future.thenApply(param -> param + " World");
CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));
System.out.println(result1.get());
System.out.println(result2.get());
}

这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。

CompletableFuture<String> f0 = 
CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + " QQ") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());
//输出结果
HELLO WORLD QQ

可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

CompletableFuture 中 thenApply 如何实现?

  1. 先看下静态创建CompletableFuture的方法 supplyAsync。
//静态方法,如果没有传入线程池,使用ForkJoinPool的common线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
//新建CompletableFuture对象
CompletableFuture<U> d = new CompletableFuture<U>();
//构造AsyncSupply对象,线程池提交AsyncSupply任务
e.execute(new AsyncSupply<U>(d, f));
//将CompletableFuture对象返回
return d;
}
static final class AsyncSupply<T> extends ForkJoinTask<Void>
//可以看到AsyncSupply是一个Runnable对象
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier extends T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier extends T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return false; }
public void run() {
CompletableFuture<T> d; Supplier extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
//CompletableFuture对象的result为空时
if (d.result == null) {
try {
//调用传入的supplier的get方法,并将结果放入result字段
//注意:这是在线程池中提交的,所以是异步处理的
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//处理完当前方法后,处理依赖它的栈顶方法,后面的回调方法入栈和这块呼应
d.postComplete();
}
}
}
final void postComplete() {
// 变量f存储的是当前已经完成的CompletableFuture
CompletableFuture f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture d; Completion t;
// CAS操作,将依赖此阶段的栈顶元素取出,并且设置为下一个
if (STACK.compareAndSet(f, h, t = h.next)) {
if (t != null) {
if (f != this) {
//如果f不是this,将刚出栈的h入this的栈顶
pushStack(h);
continue;
}
// 将h剥离出来,h.next=null,帮助gc
NEXT.compareAndSet(h, t, null); // try to detach
}
//调用tryFire
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
  1. 再看下异步处理完supplyAsync 后的回调方法thenApply 方法,看看它是如何实现回调的。
public <U> CompletableFuture<U> thenApply(
Function super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function super T,? extends V> f) {
if (f == null) throw new NullPointerException();
Object r;
// 如果当前阶段结果已经返回,则直接运行回调方法
if ((r = result) != null)
return uniApplyNow(r, e, f);
CompletableFuture<V> d = newIncompleteFuture();
// 构造Completion放入等待栈的顶
unipush(new UniApply<T,V>(e, d, this, f));
return d;
}
private <V> CompletableFuture<V> uniApplyNow(
Object r, Executor e, Function super T,? extends V> f) {
Throwable x;
CompletableFuture<V> d = newIncompleteFuture();
// 如果依赖的方法异常中断,则直接处理并返回异常
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
d.result = encodeThrowable(x, r);
return d;
}
r = null;
}
try {
// 执行到这里说明依赖的任务已经有结果了,用它的结果当作参数调用回调方法
// 注意这里都是线程池中的线程在执行,所以是异步执行
if (e != null) {
e.execute(new UniApply<T,V>(null, d, this, f));
} else {
@SuppressWarnings("unchecked") T t = (T) r;
d.result = d.encodeValue(f.apply(t));
}
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
return d;
}
final void unipush(Completion c) {
if (c != null) {
// CAS自旋将回调方法压入栈顶
while (!tryPushStack(c)) {
if (result != null) {
NEXT.set(c, null);
break;
}
}
// 可能在重试中完成,判断result不为空就执行
if (result != null)
c.tryFire(SYNC);
}
}
//再次尝试判断依赖方法是否处理完成,处理完成则调用目标回调方法
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
Object r; Throwable x; Function super T,? extends V> f;
if ((a = src) == null || (r = a.result) == null
|| (d = dep) == null || (f = fn) == null)
return null;
tryComplete: if (d.result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
d.completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (mode <= 0 && !claim())
return null;
else {
@SuppressWarnings("unchecked") T t = (T) r;
d.completeValue(f.apply(t));
}
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
src = null; dep = null; fn = null;
//成功处理完依赖方法和回调方法后进行处理,可能唤醒其他的回调方法或者清理栈
return d.postFire(a, mode);
}

描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别是源自 fn、consumer、action 这三个核心参数不同。

public <U,V> CompletionStage<V> thenCombine(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn,Executor executor);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage extends U> other,BiConsumer super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U> action, Executor executor);
public CompletionStage<Void> runAfterBoth(CompletionStage other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage other,Runnable action,Executor executor);

Async后缀的方法表示,前面的 CompletionStage 执行完成,在执行后续操作时会提交到线程池处理,否则就还是使用同一个处理线程完成CompletableFuture的所有任务。

这三种方法意思都是等两个 CompletionStage 都完成了计算才会执行下一步的操作,区别在于参数接口类型不一样。

CompletableFuture 中 thenAcceptBoth 如何实现?talk is cheap!

public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage extends U> other,
BiConsumer super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
private <U> CompletableFuture<Void> biAcceptStage(
Executor e, CompletionStage<U> o,
BiConsumer super T,? super U> f) {
CompletableFuture<U> b; Object r, s;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture<Void> d = newIncompleteFuture();
// 如果两个阶段有任何一个没有执行完成,则将回调方法分别放到两个互相依赖阶段的栈顶
if ((r = result) == null || (s = b.result) == null)
bipush(b, new BiAccept<T,U>(e, d, this, b, f));
else if (e == null)
// 如果两个依赖的阶段都执行完成则调用回调方法
d.biAccept(r, s, f, null);
else
try {
e.execute(new BiAccept<T,U>(null, d, this, b, f));
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
return d;
}

描述 OR 汇聚关系

OR的关系,表示谁运行快就用谁的结果执行下一步操作。

public <U> CompletionStage<U> applyToEither(CompletionStage extends T> other,Function super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn,Executor executor);
public CompletionStage<Void> acceptEither(CompletionStage extends T> other,Consumer super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage extends T> other,Consumer super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage extends T> other,Consumer super T> action,Executor executor);
public CompletionStage<Void> runAfterEither(CompletionStage other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage other,Runnable action,Executor executor);

同样也是有Async后缀的表示,当前面的 CompletionStage 执行完成,在执行后续操作时会提交到线程池处理。applyToEither、acceptEither、runAfterEither 三个方法的区别还是来自于不同的接口参数类型:Function、Consumer、Runnable。

CompletableFuture 中 applyToEither 如何实现?

public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage extends T> other, Function super T, U> fn) {
return orApplyStage(defaultExecutor(), other, fn);
}
private <U extends T,V> CompletableFuture<V> orApplyStage(
Executor e, CompletionStage<U> o, Function super T, ? extends V> f) {
CompletableFuture<U> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
Object r; CompletableFuture extends T> z;
// 这块是重点,有任何一个阶段的结果不为空就直接执行function
if ((r = (z = this).result) != null ||
(r = (z = b).result) != null)
return z.uniApplyNow(r, e, f);
CompletableFuture<V> d = newIncompleteFuture();
// 如果都为空则将回调方法分别push到被依赖的两个阶段的栈顶
orpush(b, new OrApply<T,U,V>(e, d, this, b, f));
return d;
}

异常处理

在Java编程中,异常处理当然是必不可少的一环,那你可能会想到如果在使用 CompletableFuture 进行异步链式编程时,如果出现异常该怎么处理呢?

首先上面我们提到的 fn、consumer、action 它们的核心方法是不允许抛出可检查异常的,但是却无法限制它们抛出运行时异常。在同步方法中,我们可以使用 try-catch{} 来捕获并处理异常,但在异步编程里面异常该如何处理 ?CompletionStage 接口给我们提供的方案非常简单,比 try-catch{} 还要简单。

下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

public CompletableFuture<T> whenComplete(BiConsumer super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 == 0) {
int i = 12 / 0;
}
System.out.println("执行结束!");
});
future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void t, Throwable action) {
System.out.println("执行完成!");
}
});
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败:" + t.getMessage());
return null;
}
}).join();

handle 也是执行任务完成时对结果的处理,whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

当上一个的 CompletableFuture 的值计算完成或者抛出异常的时候,会触发 handle 方法中定义的函数,结果由 BiFunction 参数计算而得,因此这组方法兼有 whenComplete 和转换的两个功能。

public <U> CompletionStage<U> handle(BiFunction super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction super T, Throwable, ? extends U> fn,Executor executor);

JDK8流式编程结合

public static List<String> exampleCompletableFutureAndStream() {
ExecutorService executorService = Executors.newCachedThreadPool();
List<String> loads = null;
try {
// 所有需要查询远程服务的load列表
List<String> requestList = Lists.newArrayList("load1", "load2", "load3", "load4");
List<CompletableFuture<String>> completableFutures = requestList.stream()
// 使用CompletableFuture以异步方式查询数据
.map(req -> CompletableFuture.supplyAsync(() -> invokeReq(req), executorService))
.map(future -> future.thenApply(Load::getStatus))
.map(future -> future.thenCompose(status -> CompletableFuture.supplyAsync(() -> status.name().toUpperCase())))
.toList();
loads = completableFutures.stream().map(CompletableFuture::join).toList();
System.out.println(Thread.currentThread().getName() + ": CompletableFuture异步方式查询请求已完成:" + loads.size());

} finally {
executorService.shutdown();
}
return loads;
}

注意到了吗?这里使用了两个不同的Stream流水线,是否可以在同一个处理流的流水线上一个接一个地放置多个map操作。

public static List<String> exampleCompletableFutureAndStream() {
ExecutorService executorService = Executors.newCachedThreadPool();
List<String> loads = null;
try {
// 所有需要查询远程服务的load列表
List<String> requestList = Lists.newArrayList("load1", "load2", "load3", "load4");
loads = requestList.stream()
// 使用CompletableFuture以异步方式查询数据
.map(req -> CompletableFuture.supplyAsync(() -> invokeReq(req), executorService))
.map(future -> future.thenApply(Load::getStatus))
.map(future -> future.thenCompose(status -> CompletableFuture.supplyAsync(() -> status.name().toUpperCase())))
.map(CompletableFuture::join)
.toList();
System.out.println(Thread.currentThread().getName() + ": CompletableFuture异步方式查询请求已完成:" + loads.size());
} finally {
executorService.shutdown();
}
return loads;
}

这其实是有原因的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,不同的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定服务请求的动作、通知join方法返回结果。

再来看一个例子:

我们的系统提供的运费价格是以美元计价的,但是你希望以人民币(RMB)的方式提供给你的客户。你可以用异步的方式向计费中心查询指定Load的价格,同时从远程的汇率服务那里查到人民币和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以人民币计价的商品价格。

public class MultiThreadTest {
@Test
public void test18() {
long start = System.nanoTime();
List<CompletableFuture<Double>> futures = loads.stream()
.map(laod ->
CompletableFuture
// 查商品价格操作和查兑换汇率操作同时进行,当两者都完成时将结果进行整合
.supplyAsync(() -> load.getPrice("load1"))
.thenCombine(CompletableFuture.supplyAsync(() -> RateService.getRate("RMB", "USD")), (price, rate) -> price * rate)
)
.collect(toList());
List<Double> usdPrices = futures.stream()
.map(CompletableFuture::join)
.collect(toList());

}

}

通过上述例子,可以看到相对于采用Java 8之前提供的Future实现,CompletableFuture版本实现所具备的巨大优势。CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。

为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升,下面尝试仅使用Java 7中提供的特性,重新实现上述例子的功能。

public class MultiThreadTest {
@Test
public void test19() throws ExecutionException, InterruptedException {
long start = System.nanoTime();
List<Future<Double>> usdFuturePrices = new ArrayList<>(shops.size());
for (Shop shop : shops) {
// 创建一个查询人民币到美元转换汇率的Future
final Future<Double> usdFutureRate = executor.submit(new Callable<Double>() {
public Double call() {
return RateService.getRate("RMB", "USD");
}
});
// 在第二个Future中查询指定商店中特定商品的价格
Future<Double> usdFuturePrice = executor.submit(new Callable<Double>() {
public Double call() throws ExecutionException, InterruptedException {
double rmbPrice = shop.getPrice("肥皂");
// 在查找价格操作的同一个Future中, 将价格和汇率做乘法计算出汇后价格
return rmbPrice * usdFutureRate.get();
}
});
usdFuturePrices.add(usdFuturePrice);
}
List<Double> usdPrices = new ArrayList<>(usdFuturePrices.size());
for (Future<Double> usdFuturePrice : usdFuturePrices) {
usdPrices.add(usdFuturePrice.get());
}

}
}

这里我们思考这样一个问题:并行使用流还是CompletableFuture?

对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。同时也可以提供更多描述任务之间关系的接口,我们不需要为之编写更多的代码。

这里对使用这些API的建议如下:

总结

今天大家学到了哪些知识呢?

  1. 如何优化接口性能?某些场景下可以使用多线程并行代替串行。
  2. 如何实现接口并行调用?通过今天的学习可以使用 Future+Callable、FutureTask、CompletableFuture。
  3. 详细介绍了CompletableFuture的强大,掌握CompletableFuture提供的函数式编程的能力,以及与JDK8流式编程结合使用,使代码更加美观优雅,写起来简洁和便利。
  4. 在接口设计时可以参考CompletableFuture的实现,将两个无关的接口能力组装在一起以实现更加强大的功能。

不足之处:今天只是对于并发编程中的工具类使用和相关原理做了分享,在实际开发过程中可能需要考虑到更多的通用性,封装通过调用模版方法,不要每一个地方都写一堆类似的代码。

通过今天的分享,希望大家可以在平时开发工作中遇到合适的场景时尝试使用 CompletableFuture 提供的API,优化程序性能、提高开发效率。

那如果都看到了这里,一定要帮七哥给个点赞支持哦~

来源:七哥聊编程内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯