文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

并发编程 | 从Future到CompletableFuture - 简化 Java 中的异步编程

2023-08-19 15:01

关注

引言

在并发编程中,我们经常需要处理多线程的任务,这些任务往往具有依赖性,异步性,且需要在所有任务完成后获取结果。Java 8 引入了 CompletableFuture 类,它带来了一种新的编程模式,让我们能够以函数式编程的方式处理并发任务,显著提升了代码的可读性和简洁性。
在这篇博客中,我们将深入探讨 CompletableFuture 的设计原理,详细介绍其 API 的使用方式,并通过具体的示例来展示其在并发任务处理中的应用。我们也将探讨其与 Future,CompletableFuture 以及 Java 并发包中其他工具的对比,理解何时以及为什么需要使用 CompletableFuture。让我们一起踏上这个富有挑战性的学习之旅吧!


在开始之前,我们先来回顾一下Java语言发展历史

Java 并发编程的演进

自从诞生以来,Java 就一直致力于提供强大的并发和异步编程工具。在最初的 JDK 1.4 时期,Java 开发者需要使用低级的并发控制工具,如 synchronized 和 wait/notify,这些工具虽然功能强大,但使用起来非常复杂。
为了简化并发编程,Java 在 JDK 1.5 中引入了JUC包,提供了一系列高级的并发控制工具,如 ExecutorService、Semaphore 和 Future。

我们先来看下,Future到底是怎么进行异步编程的

Future的异步编程之旅

在开始我们的旅程之前,我们先看看一下这个需求。

一个复杂的需求

假设你正在为一家在线旅行社工作,用户可以在网站上搜索并预订飞机票和酒店。以下是你需要处理的一系列操作:

  1. 根据用户的搜索条件,查询所有可用的飞机票
  2. 对每一个飞机票,查询与之匹配的可用酒店
  3. 对每一个飞机票和酒店的组合,计算总价格
  4. 将所有的飞机票和酒店的组合按照价格排序
  5. 将结果返回给用户

实现

为了实现这个需求,首先,我们需要创建一个 ExecutorService,:

ExecutorService executor = Executors.newFixedThreadPool(10);
// 1. 查询飞机票Future<List<Flight>> futureFlights = executor.submit(() -> searchFlights(searchCondition));List<Flight> flights;try {    flights = futureFlights.get();} catch (InterruptedException | ExecutionException e) {    // 处理异常}// 2. 对每个飞机票查询酒店List<Future<List<Hotel>>> futureHotelsList = new ArrayList<>();for (Flight flight : flights) {    Future<List<Hotel>> futureHotels = executor.submit(() -> searchHotels(flight));    futureHotelsList.add(futureHotels);}List<Future<List<TravelPackage>>> futureTravelPackagesList = new ArrayList<>();for (Future<List<Hotel>> futureHotels : futureHotelsList) {    List<Hotel> hotels;    try {        hotels = futureHotels.get();    } catch (InterruptedException | ExecutionException e) {        // 处理异常    }        // 3. 对每个飞机票和酒店的组合计算总价格    for (Hotel hotel : hotels) {        Future<List<TravelPackage>> futureTravelPackages = executor.submit(() -> calculatePrices(flight, hotel));        futureTravelPackagesList.add(futureTravelPackages);    }}List<TravelPackage> travelPackages = new ArrayList<>();for (Future<List<TravelPackage>> futureTravelPackages : futureTravelPackagesList) {    try {        travelPackages.addAll(futureTravelPackages.get());    } catch (InterruptedException | ExecutionException e) {        // 处理异常    }}// 4. 将所有的旅行套餐按照价格排序travelPackages.sort(Comparator.comparing(TravelPackage::getPrice));// 5. 返回结果return travelPackages;

需求终于做完了(叹气声)。此时此刻,生在JDK8+的你,会不会感同身受呢。这还是在没有处理异常,没有很多业务代码的前提下。好,现在缓一下我们继续。我们可以从上面代码最直观的看到什么?


再完美的表达,也敌不过一个让你直观感受的例子。接下来,我们来分析一下Future的缺点。

分析这趟Future异步编程之旅

从上面的 Future 的例子中,我们可以明显看到以下几点缺点:

回调地狱

Future 的实现使得我们必须在每一个 Future 完成后启动另一个 Future,这使得代码看起来像是在不断嵌套回调。这种方式会使得代码难以阅读和理解,特别是在涉及复杂的异步任务链时。

阻塞操作

虽然 Future.get() 可以得到任务的结果,但这是一个阻塞操作,它会阻止当前线程的执行,直到异步操作完成。这种设计对于要实现非阻塞的异步编程来说,是非常不理想的。

复杂的错误处理

在使用 Future 链式处理异步任务时,如果中间某个环节出现错误,错误处理的复杂性就会大大增加。你需要在每个 Future 的处理过程中都增加异常处理代码,这使得代码变得更加复杂和难以维护。

无法表示任务间复杂关系

使用 Future 很难直观地表示出任务之间的依赖关系。例如,你无法使用 Future 来表示某个任务需要在另外两个任务都完成后才能开始,或者表示多个任务可以并行执行但是必须在一个共同的任务之前完成。这种限制使得 Future 在处理复杂的异步任务链时变得非常困难。

因此,为了解决这些问题,CompletableFuture 被引入了 Java 8,提供了更强大和灵活的异步编程工具。


CompletableFuture的异步编程之旅

同样还是上面的例子,我们来看下它的实现代码:

CompletableFuture.supplyAsync(() -> searchFlights())  // 1. 查询飞机票                .thenCompose(flights -> {  // 2. 对每个飞机票查询酒店                    List<CompletableFuture<List<TravelPackage>>> travelPackageFutures = flights.stream().map(flight -> CompletableFuture.supplyAsync(() -> searchHotels(flight))  // 查询酒店        .thenCompose(hotels -> {  // 3. 对每个飞机票和酒店的组合计算总价格            List<CompletableFuture<TravelPackage>> packageFutures = hotels.stream()                    .map(hotel -> CompletableFuture.supplyAsync(() -> new TravelPackage(flight, hotel)))                    .collect(Collectors.toList());            return CompletableFuture.allOf(packageFutures.toArray(new CompletableFuture[0]))                    .thenApply(v -> packageFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));        })).collect(Collectors.toList());                    return CompletableFuture.allOf(travelPackageFutures.toArray(new CompletableFuture[0])).thenApply(v -> travelPackageFutures.stream()        .flatMap(future -> future.join().stream())        .collect(Collectors.toList()));                })                .thenApply(travelPackages -> {  // 4. 将所有的旅行套餐按照价格排序                    return travelPackages.stream().sorted(Comparator.comparing(TravelPackage::getPrice)).collect(Collectors.toList());                })                .exceptionally(e -> {  // 处理所有的异常                    // 处理异常                    return null;                });

你可能乍一看,感觉怎么比Future还要复杂。但是实际在业务中,它反而更加容易读懂。每一步,每一个操作都可以顺着thenCompose下去。


分析这趟CompletableFuture异步编程之旅

CompletableFuture 是 Java 8 中引入的,用于解决在使用 Future 时遇到的一些问题。它实现了 FutureCompletionStage 接口,并且提供了大量的方法来帮助你更好地控制和管理异步操作。我们来结合上面的例子来分析它的优点:

链式编程

我们使用 CompletableFuture 中的 supplyAsync 方法来异步地开始查询航班的操作:

    CompletableFuture<List<Flight>> flightsFuture = CompletableFuture.supplyAsync(() ->         searchFlights(source, destination));

然后,我们使用 thenCompose 方法将查询航班和查询酒店的操作连在一起:

    CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCompose(flights ->        CompletableFuture.supplyAsync(() -> flights.stream()            .map(flight -> searchHotels(flight))            .collect(Collectors.toList())        ));

非阻塞操作

上述的 thenCompose 方法是非阻塞的,即查询酒店的操作会立即开始,而不需要等待查询航班的操作完成。

异常处理

我们使用 exceptionally 方法处理查询航班和查询酒店过程中可能出现的异常:

    CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCompose(flights ->        CompletableFuture.supplyAsync(() -> flights.stream()            .map(flight -> searchHotels(flight))            .collect(Collectors.toList())        )).exceptionally(ex -> {            System.out.println("失败了: " + ex);            return new ArrayList<>();        });

表示任务间复杂关系

我们使用 CompletableFuture.allOf 方法来表示所有的旅行套餐计算任务都必须在开始排序之前完成:

    CompletableFuture<List<TravelPackage>> sortedTravelPackagesFuture = travelPackagesFuture.thenApply(travelPackages ->        travelPackages.stream()            .flatMap(List::stream)            .sorted(Comparator.comparing(TravelPackage::getPrice))            .collect(Collectors.toList())        );

暂停一分钟,再细细体会上面的例子。我们接着来集中比较这两者

CompletableFuture与Future的比较

异步执行与结果获取

链式操作

异常处理

任务组合

灵活的任务执行控制

假如有一个面试官现在问题它们两者的区别,你会回答了吗? 接下来,我们来解析一下


进阶 | 理解CompletableFuture原理

为了让你理解的不那么晦涩,我为你讲生活中的例子:

我们可以把 CompletableFuture 想象成一家装配线生产车间。每一件零件(任务)的加工完成(Future 完成)都可能会触发下一步工作(下一步的操作),而每一步工作的完成都会通知车间(Future),以便开始下一个阶段的生产。这个过程就像一条流水线,每完成一个步骤就自动进行下一个。

带着这个场景,我们接着往下看。

任务链

CompletableFuture 的源码中,有一个内部类 Completion,代表了任务链中的一项任务。每当一个任务完成时,它都会尝试去完成依赖于它的任务,就像流水线上的工人完成了一部分工作后,就会把半成品传递给下一个工人。

    abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {        // ...    }

结果容器

CompletableFuture 本身就是一个结果容器,它持有了执行的结果,包括正常的计算结果或者执行过程中出现的异常。

    volatile Object result; // The outcome of the computation

工作线程

所有的异步任务都会提交到 ForkJoinPool.commonPool() 中进行执行,当然也可以指定自定义的 Executor 来执行任务。

    static final ForkJoinPool ASYNC_POOL = ForkJoinPool.commonPool();

任务触发

当一个任务完成后,CompletableFuture 会通过 tryFire 方法触发与之关联的下一个任务。这就好比工人完成了一部分工作后,通知流水线的下一位工人继续完成接下来的工作。

 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {        // ...        if (a != null && a.stack != null) {            if (mode < 0)                a.cleanStack();            else                a.postComplete();        }        if (b != null && b.stack != null) {            if (mode < 0)                b.cleanStack();            else                b.postComplete();        }        return null;    }

是不是有点理解了呢?我可以肯定的说,你已经超过80%的人了!


CompletableFuture的主要方法

细心的你肯定发现了,CompletableFuture大多数方法都实现于一个CompletionStage接口。当然,我在这里可以为你把所有方法都试过一遍,但是你肯定会看的特别累。这样!我把上面需求中所用到的方法都为你讲解,剩下的请你结合网上的案例学习。

supplyAsync()方法

这个方法用于异步执行一个供应函数,并返回一个CompletableFuture对象。在我们的示例中,这个方法用于启动一个异步任务来查找航班。

    CompletableFuture<List<Flight>> flightsFuture = CompletableFuture.supplyAsync(() -> searchFlights(destination));

thenCompose()方法

这个方法用于链接多个CompletableFuture对象,形成一个操作链。当一个操作完成后,thenCompose()方法会将操作的结果传递给下一个操作。在我们的示例中,这个方法用于在找到航班之后查找酒店。

    CompletableFuture<List<Hotel>> hotelsFuture = flightsFuture.thenCompose(flights -> CompletableFuture.supplyAsync(() -> searchHotels(destination)));

thenCombine()方法

这个方法用于将两个独立的CompletableFuture对象的结果合并为一个结果。在我们的示例中,这个方法用于将查找航班和酒店的结果合并为一个旅行套餐。

    CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCombine(hotelsFuture, (flights, hotels) -> createTravelPackages(flights, hotels));

thenAccept()方法

这个方法在CompletableFuture对象完成计算后执行一个消费函数,接收计算结果作为参数,不返回新的计算值。在我们的示例中,这个方法用于打印出所有的旅行套餐。

    travelPackagesFuture.thenAccept(travelPackages -> printTravelPackages(travelPackages));

allOf()方法

这个方法用于将一个CompletableFuture对象的数组组合成一个新的CompletableFuture对象,这个新的CompletableFuture对象在数组中所有的CompletableFuture对象都完成时完成。在我们的示例中,这个方法用于将每个航班与每个酒店的组合结果(也就是旅行套餐)组合在一起。

    CompletableFuture.allOf(packageFutures.toArray(new CompletableFuture[0]))                    .thenApply(v -> packageFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));

thenApply()方法

这个方法用于对CompletableFuture的结果进行变换,并返回一个新的CompletableFuture对象。在我们的示例中,这个方法用于将查询到的旅行套餐按照价格进行排序。

    .thenApply(travelPackages -> {  // 4. 将所有的旅行套餐按照价格排序                    return travelPackages.stream().sorted(Comparator.comparing(TravelPackage::getPrice)).collect(Collectors.toList());                })

exceptionally()方法

这个方法用于处理CompletableFuture的异常情况。如果CompletableFuture的计算过程中抛出异常,那么这个方法会被调用。在我们的示例中,这个方法用于处理查询旅行套餐过程中可能出现的任何异常。

    .exceptionally(e -> {  // 处理所有的异常                    // 处理异常                    return null;                });

当然,这些方法已经够你用了。除非这个需求比我想得还复杂,那算你厉害。哦,不对,算需求变态。现在,你可以挥起历史的毛笔续写了吗?


Java 并发编程的续章

JDK 1.5 的 Future 解决了许多并发编程的复杂性,但是它仍有一些局限性。Future 只能描述一个异步操作,并不能描述一个由多个步骤组成的异步操作。例如,当需要处理一个由多个异步操作序列组成的业务流程时,你可能会发现你的代码被复杂的回调逻辑淹没,这就是人们常说的回调地狱。此外,Future 没有提供一种有效的方式来处理异步操作的结果,你只能通过阻塞调用 get() 方法来获取结果。
为了解决这些问题,Java 在 JDK 1.8 中引入了 CompletableFuture。CompletableFuture 是 Future 的增强版,它不仅能表示一个异步操作,还可以通过 thenCompose(), thenCombine(), allOf() 等方法来描述一个由多个步骤组成的异步操作。通过这些方法,CompletableFuture 能以流畅的链式调用的方式来描述复杂的异步业务流程,这大大简化了异步编程的复杂性。


常见面试题

请解释一下 Future 接口在 Java 中的用途?

解释一下 Future 的局限性是什么?

请解释一下 CompletableFuture 的用途以及它如何克服 Future 的局限性?

如何用 CompletableFuture 来表示一组并行的异步操作?

请解释一下 CompletableFuture 的 thenApply(),thenCompose(),和 thenCombine() 方法的作用及区别?

如果你有一个耗时的异步操作需要执行,但是你又不希望调用 get() 方法时阻塞,你可以使用 CompletableFuture 的哪个方法来达到这个目的?

如何处理 CompletableFuture 的异常?

请解释一下 CompletableFuture 的工作原理?

阅读完文章的你,是否可以回答这些问题呢?我在留言等你。

总结

好了,到这里就结束了,我们来回顾一下。首先,我带你回顾了一下Java并发世界的编年史。紧接着,我带你体验了一下古人经常使用的Future。感到它的不妙之后,我带你回到CompletableFuture 。紧接着有深入了解了它的全貌以及使用方法。最后,希望阅读到这里的你,不要忘记回答问题哦。

来源地址:https://blog.csdn.net/yehenaladonggui/article/details/131695205

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯