文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java8并发新特性CompletableFuture怎么使用

2023-06-30 18:33

关注

这篇文章主要介绍“Java8并发新特性CompletableFuture怎么使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Java8并发新特性CompletableFuture怎么使用”文章能帮助大家解决问题。

1.CompletableFuture是什么?

各位小伙伴是否有一种感觉,整天大量时间沉迷于业务开发的同时,缺少对于一些技术更新的关注,忽略掉了很多实用又简单的方法,以往我们做异步任务的时候都习惯于使用Callable或者Runnable接口去实现,今天我们就来聊聊与之不同的CompletableFuture类。

CompletableFuture针对Future接口做了改进,相比Callable/Runnable接口它支持多任务进行链式调用、组合、多任务并发处理。很多时候我们在设计过程中会想在一个异步任务执行完成后,直接获取它的结果传递给下一个任务继续执行后续的流程,这时候CompletableFuture的作用就来了。

CompletableFuture类关系图:

从以下类图可以看到,CompletableFuture实现了Future和CompletionStage两个接口,Future提供了获取任务执行结果和任务执行状态的功能。 CompletionStage表示一个任务的执行阶段,提供了诸多方法支持了多任务的聚合功能。

Java8并发新特性CompletableFuture怎么使用

2.CompletableFuture的方法使用说明

2.1 CompletableFuture类提供几个静态方法来进行异步操作

supplyAsync与runAsync主要用于构建异步事件。

supplyAsync带有返回值的异步任务,支持在默认线程池ForkJoinPool.commonPool()中完成异步任务,也可以使用自定义线程池执行异步任务,结果返回一个新的CompletableFuture,返回结果类型U。最终的任务执行结果可通过返回CompletableFuture对象的 get()/join() 方法获取返回值。

// 使用默认线程池public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {...}// 使用自定义线程池Executorpublic static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {...}// ====================================demo华丽分割线============================================CompletableFuture<String> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {    log.info("executing supplyAsync task ...");    return "this is supplyAsync";});// 进入阻塞获取异步任务结果log.info(supplyAsyncFuture.get());  // 输出结果:this is supplyAsync

runAsync不带返回值的异步任务,支持在默认线程池ForkJoinPool.commonPool()中完成异步任务,也可以使用自定义线程池执行异步任务,结果返回一个新的CompletableFuture,返回结果类型为Void,也就是无返回值。

public static CompletableFuture<Void> runAsync(Runnable runnable) {...}public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {...}// ====================================demo华丽分割线============================================CompletableFuture<Void> runAsyncFuture = CompletableFuture.runAsync(() -> {    log.info("executing runAsync task ...");});runAsyncFuture.get();

allOf:多个CompletableFuture任务并发执行,所有CompletableFuture任务完成时,返回一个新的CompletableFuture对象,其返回值为Void,也就是无返回值。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {...}// ====================================demo华丽分割线============================================// allOf,可传递返回值不同类型的future,最终结果按自己设计预期处理即可CompletableFuture<String> cf11 = CompletableFuture.supplyAsync(() -> {    log.info("executing supplyAsync task cf11 ...");    return "this is supplyAsync";});CompletableFuture<String> cf12 = CompletableFuture.supplyAsync(() -> {    log.info("executing supplyAsync task cf12 ...");    return "this is supplyAsync";});CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(cf11, cf12);allOfFuture.get();

anyOf:多个CompletableFuture任务并发执行,只要有一个CompletableFuture任务完成时,就会返回一个新的CompletableFuture对象,并返回该CompletableFuture执行完成任务的返回值。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...}// ====================================demo华丽分割线============================================CompletableFuture<String> cf21 = CompletableFuture.supplyAsync(() -> {    log.info("executing supplyAsync task cf21 ...");    return "this is supplyAsync cf21";});CompletableFuture<String> cf22 = CompletableFuture.supplyAsync(() -> {    log.info("executing supplyAsync task cf22 ...");    return "this is supplyAsync cf22";});CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(cf21, cf22);log.info("{}", anyOfFuture.get());  // 输出结果:this is supplyAsync cf21或cf22

2.2 获取异步任务执行结果的方法 get()/join()

join()和get()方法都是CompletableFuture对象基于阻塞的方式来获取异步任务执行结果。

// 可中断,可设置超时时间public T get() throws InterruptedException, ExecutionException {...}public T get(long timeout, TimeUnit unit) throws InterruptedException,                 ExecutionException, TimeoutException {...}public T join() {...}

3.CompletionStage的方法使用说明

CompletionStage表示一个任务的执行阶段,每个任务都会返回一个CompletionStage对象,可以对多个CompletionStage对象进行串行、并行或者聚合的方式来进行下阶段的操作,也就是说实现异步任务的回调功能。CompletionStage总共提供了38个方法来实现多个CompletionStage任务的各种操作, 接下来我们就针对这些方法分类来了解一下。

以下类型均有三种使用方式:

3.1 纯消费类型

// 不使用线程池,仅依赖当前线程执行,不做异步public CompletionStage<Void> thenAccept(Consumer<? super T> action);// 使用默认线程池ForkJoinPool.commonPool执行任务public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);// 使用自定义线程池执行任务public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture.supplyAsync(() -> "this is supplyAsync")        .thenAcceptAsync((result) -> {            log.info("{} thenAcceptAsync", result);        }).join();        // 输出结果:this is supplyAsync thenAcceptAsync

依赖两个任务都完成(thenAcceptBoth):两个CompletionStage任务并发执行,必须都完成了才执行action回调处理,即仅仅消费了两个CompletionStage任务的返回值,回调处理结果无返回值。

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,                             BiConsumer<? super T, ? super U> action);// 原理同上,使用默认线程池执行异步任务public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,                             BiConsumer<? super T, ? super U> action);// 原理同上,使用自定义线程池执行异步任务public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,                             BiConsumer<? super T, ? super U> action, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> cf311 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf311");CompletableFuture<String> cf312 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf312");cf311.thenAcceptBothAsync(cf312, (r1, r2) -> {   log.info("{} and {}", r1, r2);}).join();// 输出结果:this is supplyAsync cf311 and this is supplyAsync cf312

依赖两个任务中的任何一个完成(acceptEither):两个CompletionStage任务并发执行,只要其中一个先完成了就携带返回值执行action回调处理,即仅仅消费了优先完成的CompletionStage任务的返回值,回调处理结果无返回值。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,                             Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,                             Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,                             Consumer<? super T> action, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> cf311 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf311");CompletableFuture<String> cf312 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf312");cf311.acceptEitherAsync(cf312, (r) -> {    log.info(r); // 输出结果:this is supplyAsync cf311或cf312}).join();

3.2 有返回值类型

依赖单个任务完成(thenApply):由上一个CompletionStage任务执行完成的结果传递到action进行回调处理,即不止消费了上一个CompletaionStage任务的返回值,同时回调处理结果也有返回值

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> cf32 = CompletableFuture.supplyAsync(() -> "this is supplyAsync")        .thenApplyAsync(result -> result + " and thenApplyAsync");log.info(cf32.join());  // 输出结果:this is supplyAsync and thenApplyAsync

依赖两个任务都完成(thenCombine):两个CompletionStage任务并发执行,必须都完成了才执行action回调处理,即不止消费了两个CompletaionStage任务的返回值,同时回调处理结果也有返回值。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,                             BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,                             BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,                             BiFunction<? super T,? super U,? extends V> fn, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> cf321 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf321");CompletableFuture<String> cf322 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf322");CompletableFuture<String> thenCombineFuture = cf321.thenCombineAsync(cf322, (r1, r2) -> {    return r1 + " and " + r2;});log.info(thenCombineFuture.join());// 输出结果:this is supplyAsync cf321 and this is supplyAsync cf322

依赖两个任务中的任何一个完成(applyToEither):两个CompletionStage任务并发执行,只要其中一个任务执行完成就会action回调处理,即不止消费了优先完成的CompletionStage的返回值,同时回调处理结果也有返回值。

// 原理同3.1的acceptEither,只不过applyToEither任务执行完成会返回一个带有返回值的CompletionStagepublic <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,                             Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,                             Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,                             Function<? super T, U> fn, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> cf321 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf321");CompletableFuture<String> cf322 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf322");CompletableFuture<String> thenCombineFuture = cf321.applyToEitherAsync(cf322, (r) -> {    return r;});log.info(thenCombineFuture.join());// 输出结果:this is supplyAsync cf321或cf322

3.3 不消费也不返回类型

依赖单个任务完成(thenRun):单个CompletionStage任务执行完成回调action处理,即执行action回调方法无参数,回调处理结果也无返回值。

// 上一个CompletionStage任务执行完成后直接回调action处理,无返回值public CompletionStage<Void> thenRun(Runnable action);// 同上,使用默认线程池执行action处理public CompletionStage<Void> thenRunAsync(Runnable action);// 同上,使用自定义线程池执行action处理public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture.runAsync(() -> {    // TODO}).thenRunAsync(() -> {    log.info("this is thenRunAsync");  // 输出结果:this is thenRunAsync}).join();

依赖两个任务都完成(runAfterBoth):两个CompletionStage任务并发执行,必须两个任务都完成才执行action回调处理,即执行action回调方法无参数,回调处理结果也无返回值。

// 原理同3.1的thenAcceptBoth,只不过runAfterBoth的action回调处理不接收参数且任务执行完成无返回值public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);// 同上,使用默认线程池执行action处理public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);// 同上,使用自定义线程池执行action处理public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> cf331 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf331");CompletableFuture<String> cf332 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf332");cf331.runAfterBoth(cf332, () -> {    log.info("this is runAfterBoth");}).join();// 输出结果:this is runAfterBoth

依赖两个任务中的任何一个完成(runAfterEither):两个CompletionStage任务并发执行,只需其中任何一个任务完成即可回调action处理,即执行action回调方法无参数,回调处理结果也无返回值。

public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> cf331 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf331");CompletableFuture<String> cf332 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf332");cf331.runAfterEitherAsync(cf332, () -> {    log.info("this is runAfterEitherAsync");}).join();// 输出结果:this is runAfterEitherAsync

3.4 组合类型

thenCompose:存在先后关系的两个任务进行串行组合,由第一个CompletionStage任务执行结果作为参数传递给第二个CompletionStage任务,最终返回第二个CompletionStage。

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {    return "this is supplyAsync";});CompletableFuture<String> thenComposeFuture = supplyFuture.thenComposeAsync((r) -> {    return CompletableFuture.supplyAsync(() -> {        return r + " and this is thenComposeAsync";    });});log.info(thenComposeFuture.join());// 输出结果:this is supplyAsync and this is thenComposeAsync

3.5 任务事件类型

CompletionStage接口也支持类似我们常用的try-catch-finally中的finally的作用,无论这个任务的执行结果是正常还是出现异常的情况,都必须要去执行的一个代码块。在CompletionStage接口提供了以下两种接口回调的形式(whenComplete、handle),并支持主线程同步执行同时也支持使用默认线程池,或者使用自定义线程池去异步执行最终的回调处理。例如我们一个事务操作,无论这段代码执行是否成功,我们都必须要去关闭事务。

任务完成事件(whenComplete):结果无返回值,若出现异常执行完whenComplete回调处理完成后将中断主线程的运行

// 1.whenComplete回调函数中Throwable对象不对空代表出现异常,为空则表示无异常public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> whenCompleteFufute = CompletableFuture.supplyAsync(() -> {    int a = 0;    int b = 100 / a;    return "this is supplyAsync normal";}).whenCompleteAsync((r, th) -> {    if (th != null) {        log.error("this is whenCompleteAsync error");    }    else {        log.info("this is whenCompleteAsync success");    }});log.info(whenCompleteFufute.join());  // 输出结果:this is whenCompleteAsync error

任务完成回调事件(handle):结果有返回值,若出现异常执行完handle回调处理完成后将继续执行主线程的后续操作,不中断主线程运行

// 2.handle回调函数中Throwable对象不对空代表出现异常,为空则表示无异常public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);// ====================================demo华丽分割线============================================CompletableFuture<String> whenCompleteFufute = CompletableFuture.supplyAsync(() -> {    int a = 0;    int b = 100 / a;    return "this is supplyAsync normal";}).handleAsync((r, th) -> {    if (th != null) {        return "this is handleAsync error";    }    else {        return "this is handleAsync success";    }});log.info(whenCompleteFufute.join());// 输出结果:this is handleAsync errorlog.info("main thread is running");// 输出结果:main thread is running

4.CompletionStage异常处理方法

exceptionally:只要是个程序,就会有异常出现的情况,例如一个CompletionStage任务,如果执行过程中出现异常,我们为了保证异常情况下程序能够正常处理业务逻辑,那么在这里我们就可以使用exceptionally进行异常回调处理。当CompletionStage任务出现异常时就会触发回调exceptionally,否则CompletionStage任务正常执行业务不进行异常回调处理。

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);// ====================================demo华丽分割线============================================CompletableFuture<String> exceptionallyFuture = CompletableFuture.supplyAsync(() -> {    int a = 0;    int b = 10 / a;  // 除数为0将抛异常    return "this is supplyAsync normal";}).exceptionally(th -> {    log.error("exception:{}", th.getMessage());    return "this is exceptionally";});log.info(exceptionallyFuture.join());  // 输出结果:this is exceptionally

:以下这两种情况可能大家在实际开发过程中会比较少见,但还是得在这里做个提醒,以免到最后准备不充分出现设计上的缺陷。

关于“Java8并发新特性CompletableFuture怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网行业资讯频道,小编每天都会为大家更新不同的知识点。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯