文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

实现异步编程,这个工具类你得掌握!

2024-12-03 06:11

关注

前言

最近看公司代码,多线程编程用的比较多,其中有对CompletableFuture的使用,所以想写篇文章总结下

在日常的Java8项目开发中,CompletableFuture是很强大的并行开发工具,其语法贴近java8的语法风格,与stream一起使用也能大大增加代码的简洁性

大家可以多应用到工作中,提升接口性能,优化代码

基本介绍

CompletableFuture是Java 8新增的一个类,用于异步编程,继承了Future和CompletionStage

这个Future主要具备对请求结果独立处理的功能,CompletionStage用于实现流式处理,实现异步请求的各个阶段组合或链式处理,因此completableFuture能实现整个异步调用接口的扁平化和流式处理,解决原有Future处理一系列链式异步请求时的复杂编码

Future的局限性

Future 的结果在非阻塞的情况下,不能执行更进一步的操作

我们知道,使用Future时只能通过isDone()方法判断任务是否完成,或者通过get()方法阻塞线程等待结果返回,它不能非阻塞的情况下,执行更进一步的操作。

不能组合多个Future的结果

假设你有多个Future异步任务,你希望最快的任务执行完时,或者所有任务都执行完后,进行一些其他操作

多个Future不能组成链式调用

当异步任务之间有依赖关系时,Future不能将一个任务的结果传给另一个异步任务,多个Future无法创建链式的工作流。

没有异常处理

现在使用CompletableFuture能帮助我们完成上面的事情,让我们编写更强大、更优雅的异步程序

基本使用

创建异步任务

通常可以使用下面几个CompletableFuture的静态方法创建一个异步任务

  1. public static CompletableFuture runAsync(Runnable runnable);              //创建无返回值的异步任务 
  2. public static CompletableFuture runAsync(Runnable runnable, Executor executor);     //无返回值,可指定线程池(默认使用ForkJoinPool.commonPool) 
  3. public static  CompletableFuture supplyAsync(Supplier supplier);           //创建有返回值的异步任务 
  4. public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor); //有返回值,可指定线程池 

使用示例:

  1. Executor executor = Executors.newFixedThreadPool(10); 
  2. CompletableFuture future = CompletableFuture.runAsync(() -> { 
  3.     //do something 
  4. }, executor); 
  5. int poiId = 111; 
  6. CompletableFuture future = CompletableFuture.supplyAsync(() -> { 
  7.  PoiDTO poi = poiService.loadById(poiId); 
  8.   return poi.getName(); 
  9. }); 
  10. // Block and get the result of the Future 
  11. String poiName = future.get(); 

使用回调方法

通过future.get()方法获取异步任务的结果,还是会阻塞的等待任务完成

CompletableFuture提供了几个回调方法,可以不阻塞主线程,在异步任务完成后自动执行回调方法中的代码

  1. public CompletableFuture thenRun(Runnable runnable);            //无参数、无返回值 
  2. public CompletableFuture thenAccept(Consumer action);         //接受参数,无返回值 
  3. public  CompletableFuture thenApply(Function fn); //接受参数T,有返回值U 

使用示例:

  1. CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"
  2.                            .thenRun(() -> System.out.println("do other things. 比如异步打印日志或发送消息")); 
  3. //如果只想在一个CompletableFuture任务执行完后,进行一些后续的处理,不需要返回值,那么可以用thenRun回调方法来完成。 
  4. //如果主线程不依赖thenRun中的代码执行完成,也不需要使用get()方法阻塞主线程。 
  1. CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"
  2.                            .thenAccept((s) -> System.out.println(s + " world")); 
  3. //输出:Hello world 
  4. //回调方法希望使用异步任务的结果,并不需要返回值,那么可以使用thenAccept方法 
  1. CompletableFuture future = CompletableFuture.supplyAsync(() -> { 
  2.   PoiDTO poi = poiService.loadById(poiId); 
  3.   return poi.getMainCategory(); 
  4. }).thenApply((s) -> isMainPoi(s));   // boolean isMainPoi(int poiId); 
  5.  
  6. future.get(); 
  7. //希望将异步任务的结果做进一步处理,并需要返回值,则使用thenApply方法。 
  8. //如果主线程要获取回调方法的返回,还是要用get()方法阻塞得到 

组合两个异步任务

  1. //thenCompose方法中的异步任务依赖调用该方法的异步任务 
  2. public  CompletableFuture thenCompose(Function> fn);  
  3. //用于两个独立的异步任务都完成的时候 
  4. public  CompletableFuture thenCombine(CompletionStage other,  
  5.                                               BiFunction fn);  

使用示例:

  1. CompletableFutureInteger>> poiFuture = CompletableFuture.supplyAsync( 
  2.   () -> poiService.queryPoiIds(cityId, poiId) 
  3. ); 
  4. //第二个任务是返回CompletableFuture的异步方法 
  5. CompletableFuture> getDeal(List<Integer> poiIds){ 
  6.   return CompletableFuture.supplyAsync(() ->  poiService.queryPoiIds(poiIds)); 
  7. //thenCompose 
  8. CompletableFuture> resultFuture = poiFuture.thenCompose(poiIds -> getDeal(poiIds)); 
  9. resultFuture.get(); 

thenCompose和thenApply的功能类似,两者区别在于thenCompose接受一个返回CompletableFuture的Function,当想从回调方法返回的CompletableFuture中直接获取结果U时,就用thenCompose

如果使用thenApply,返回结果resultFuture的类型是CompletableFuture>>,而不是CompletableFuture>

  1. CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"
  2.   .thenCombine(CompletableFuture.supplyAsync(() -> "world"), (s1, s2) -> s1 + s2); 
  3. //future.get() 

组合多个CompletableFuture

当需要多个异步任务都完成时,再进行后续处理,可以使用allOf方法

  1. CompletableFuture poiIDTOFuture = CompletableFuture 
  2.  .supplyAsync(() -> poiService.loadPoi(poiId)) 
  3.   .thenAccept(poi -> { 
  4.     model.setModelTitle(poi.getShopName()); 
  5.     //do more thing 
  6.   }); 
  7.  
  8. CompletableFuture productFuture = CompletableFuture 
  9.  .supplyAsync(() -> productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId)) 
  10.   .thenAccept(list -> { 
  11.     model.setDefaultCount(list.size()); 
  12.     model.setMoreDesc("more"); 
  13.   }); 
  14. //future3等更多异步任务,这里就不一一写出来了 
  15.  
  16. CompletableFuture.allOf(poiIDTOFuture, productFuture, future3, ...).join();  //allOf组合所有异步任务,并使用join获取结果 

该方法挺适合C端的业务,比如通过poiId异步的从多个服务拿门店信息,然后组装成自己需要的模型,最后所有门店信息都填充完后返回

这里使用了join方法获取结果,它和get方法一样阻塞的等待任务完成

多个异步任务有任意一个完成时就返回结果,可以使用anyOf方法

  1. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { 
  2.     try { 
  3.         TimeUnit.SECONDS.sleep(2); 
  4.     } catch (InterruptedException e) { 
  5.        throw new IllegalStateException(e); 
  6.     } 
  7.     return "Result of Future 1"
  8. }); 
  9.  
  10. CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { 
  11.     try { 
  12.         TimeUnit.SECONDS.sleep(1); 
  13.     } catch (InterruptedException e) { 
  14.        throw new IllegalStateException(e); 
  15.     } 
  16.     return "Result of Future 2"
  17. }); 
  18.  
  19. CompletableFuture future3 = CompletableFuture.supplyAsync(() -> { 
  20.     try { 
  21.         TimeUnit.SECONDS.sleep(3); 
  22.     } catch (InterruptedException e) { 
  23.        throw new IllegalStateException(e); 
  24.       return "Result of Future 3"
  25. }); 
  26.  
  27. CompletableFuture anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); 
  28.  
  29. System.out.println(anyOfFuture.get()); // Result of Future 2 
  30. 异常处理

    1. Integer age = -1; 
    2.  
    3. CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> { 
    4.   if(age < 0) { 
    5.     throw new IllegalArgumentException("Age can not be negative"); 
    6.   } 
    7.   if(age > 18) { 
    8.     return "Adult"
    9.   } else { 
    10.     return "Child"
    11.   } 
    12. }).exceptionally(ex -> { 
    13.   System.out.println("Oops! We have an exception - " + ex.getMessage()); 
    14.   return "Unknown!"
    15. }).thenAccept(s -> System.out.print(s)); 
    16. //Unkown! 

    exceptionally方法可以处理异步任务的异常,在出现异常时,给异步任务链一个从错误中恢复的机会,可以在这里记录异常或返回一个默认值

    使用handler方法也可以处理异常,并且无论是否发生异常它都会被调用

    1. Integer age = -1; 
    2.  
    3. CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> { 
    4.     if(age < 0) { 
    5.         throw new IllegalArgumentException("Age can not be negative"); 
    6.     } 
    7.     if(age > 18) { 
    8.         return "Adult"
    9.     } else { 
    10.         return "Child"
    11.     } 
    12. }).handle((res, ex) -> { 
    13.     if(ex != null) { 
    14.         System.out.println("Oops! We have an exception - " + ex.getMessage()); 
    15.         return "Unknown!"
    16.     } 
    17.     return res; 
    18. }); 

    分片处理

    分片和并行处理:分片借助stream实现,然后通过CompletableFuture实现并行执行,最后做数据聚合(其实也是stream的方法)

    CompletableFuture并不提供单独的分片api,但可以借助stream的分片聚合功能实现

    举个例子:

     

    1. //请求商品数量过多时,做分批异步处理 
    2. List> skuBaseIdsList = ListUtils.partition(skuIdList, 10);//分片 
    3. //并行 
    4. List>> futureList = Lists.newArrayList(); 
    5. for (List skuId : skuBaseIdsList) { 
    6.   CompletableFuture> tmpFuture = getSkuSales(skuId); 
    7.   futureList.add(tmpFuture); 
    8. //聚合 
    9. futureList.stream().map(CompletalbleFuture::join).collent(Collectors.toList()); 

    举个例子

    带大家领略下CompletableFuture异步编程的优势

    这里我们用CompletableFuture实现水泡茶程序

    首先还是需要先完成分工方案,在下面的程序中,我们分了3个任务:

    • 任务1负责洗水壶、烧开水
    • 任务2负责洗茶壶、洗茶杯和拿茶叶
    • 任务3负责泡茶。其中任务3要等待任务1和任务2都完成后才能开始

    下面是代码实现,你先略过runAsync()、supplyAsync()、thenCombine()这些不太熟悉的方法,从大局上看,你会发现:

    • 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
    • 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述任务3要等待任务1和任务2都完成后才能开始;
    • 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的
    1. //任务1:洗水壶->烧开水 
    2. CompletableFuture f1 =  
    3.   CompletableFuture.runAsync(()->{ 
    4.   System.out.println("T1:洗水壶..."); 
    5.   sleep(1, TimeUnit.SECONDS); 
    6.  
    7.   System.out.println("T1:烧开水..."); 
    8.   sleep(15, TimeUnit.SECONDS); 
    9. }); 
    10. //任务2:洗茶壶->洗茶杯->拿茶叶 
    11. CompletableFuture f2 =  
    12.   CompletableFuture.supplyAsync(()->{ 
    13.   System.out.println("T2:洗茶壶..."); 
    14.   sleep(1, TimeUnit.SECONDS); 
    15.  
    16.   System.out.println("T2:洗茶杯..."); 
    17.   sleep(2, TimeUnit.SECONDS); 
    18.  
    19.   System.out.println("T2:拿茶叶..."); 
    20.   sleep(1, TimeUnit.SECONDS); 
    21.   return "龙井"
    22. }); 
    23. //任务3:任务1和任务2完成后执行:泡茶 
    24. CompletableFuture f3 =  
    25.   f1.thenCombine(f2, (__, tf)->{ 
    26.     System.out.println("T1:拿到茶叶:" + tf); 
    27.     System.out.println("T1:泡茶..."); 
    28.     return "上茶:" + tf; 
    29.   }); 
    30. //等待任务3执行结果 
    31. System.out.println(f3.join()); 
    32.  
    33. void sleep(int t, TimeUnit u) { 
    34.   try { 
    35.     u.sleep(t); 
    36.   }catch(InterruptedException e){} 

    注意事项

    1.CompletableFuture默认线程池是否满足使用

    前面提到创建CompletableFuture异步任务的静态方法runAsync和supplyAsync等,可以指定使用的线程池,不指定则用CompletableFuture的默认线程池

    1. private static final Executor asyncPool = useCommonPool ? 
    2.         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 

    可以看到,CompletableFuture默认线程池是调用ForkJoinPool的commonPool()方法创建,这个默认线程池的核心线程数量根据CPU核数而定,公式为Runtime.getRuntime().availableProcessors() - 1,以4核双槽CPU为例,核心线程数量就是4*2-1=7个

    这样的设置满足CPU密集型的应用,但对于业务都是IO密集型的应用来说,是有风险的,当qps较高时,线程数量可能就设的太少了,会导致线上故障

    所以可以根据业务情况自定义线程池使用

     

    2.get设置超时时间不能串行get,不然会导致接口延时线程数量*超时时间

     

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    软考中级精品资料免费领

    • 历年真题答案解析
    • 备考技巧名师总结
    • 高频考点精准押题
    • 资料下载
    • 历年真题
    • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

      难度     813人已做
      查看
    • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

      难度     354人已做
      查看
    • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

      难度     318人已做
      查看
    • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

      难度     435人已做
      查看
    • 2024年上半年系统架构设计师考试综合知识真题

      难度     224人已做
      查看

    相关文章

    发现更多好内容
    咦!没有更多了?去看看其它编程学习网 内容吧