文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

强大的异步任务处理类CompletableFuture使用详解

2024-11-30 01:19

关注
在Java 8中, 新增加了一个CompletableFuture类,该类提供了差不多50个左右的方法(都是用来完成各种异步场景需求),并且结合了Future的优点(继承自Future类),提供了比Future更为强大的功能,这使得在异步编程方面变的简单,同时还提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的各种方法。

Future基本应用

Future是从JDK1.5开始有的,目的是获取异步任务执行的结果,通常情况会结合ExecutorService及Callable一起使用。

1. Future结合Callable使用

单任务执行

private static class Task implements Callable {


  @Override
  public String call() throws Exception {
    TimeUnit.SECONDS.sleep(3) ;
    return "success";
  }


}
public static void main(String[] args) throws Exception {
  ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
  Future future = executor.submit(new Task()) ;
  String result = future.get() ;
  System.out.println("执行结果:" + result) ;
}

当执行到future.get()方法的时候会阻塞,等待3s后继续执行。

多个任务同时执行

private static class Task implements Callable {
  private int sleep ;
  public Task(int sleep) {
    this.sleep = sleep ;
  }


  @Override
  public String call() throws Exception {
    TimeUnit.SECONDS.sleep(this.sleep) ;
    return "success";
  }
}
public static void main(String[] args) throws Exception {
  ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
  Future future1 = executor.submit(new Task(3)) ;
  Future future2 = executor.submit(new Task(2)) ;
  Future future3 = executor.submit(new Task(1)) ;
  String result1 = future1.get() ;
  String result2 = future2.get() ;
  String result3 = future3.get() ;
  System.out.println("result1:" + result1 + "\t" + "result2:" + result2 + "\t" + "result3:" + result3) ;
}

以上代码执行的3个任务分别用时3,2,1s。future1用时最长。

从运行的结果看到即便future2, future3执行时间短也必须等待future1执行完后才会继续,虽然你可以倒过来获取结果,但是在实际项目中的应用你应该是不能确认每个任务执行需要多长时间,谁先执行完就先获取谁。

虽然这种同步阻塞的方式在有些场景下还是很有必要的。但由于它的同步阻塞导致了当前线程不能干其它的事必须一致等待。

CompletionService解决Future的缺点

CompletionService是一边生产新的任务,一边处理已经完成的任务。简单地说就是CompletionService不管任务执行先后顺序,谁先执行完就处理谁。

private static class Task implements Callable {
  private int time;
  private String name ;
  public Task(int time, String name) {
    this.time = time ;
    this.name = name ;
  }
  @Override
  public String call() throws Exception {
    TimeUnit.SECONDS.sleep(this.time) ;
    return name ;
  }


}
public static void main(String[] args) throws Exception {
  ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
  CompletionService cs = new ExecutorCompletionService<>(pool) ;
  cs.submit(new Task(3, "name" + 3)) ;
  cs.submit(new Task(1, "name" + 1)) ;
  cs.submit(new Task(2, "name" + 2)) ;
  for (int i = 0; i < 3; i++) {
    System.out.println(cs.take().get()) ;
  }
}

通过执行结果发现,任务的结果获取是以谁先执行完处理谁与任务的执行先后没有关系。

2. CompletableFuture异步编程

CompletableFuture通过如下4个静态方法来执行异步任务

图片

2.1 简单异步任务链式调用执行

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
CompletableFuture.runAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(3) ;
    System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}, executor).thenRun(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}) ;
System.out.println("主线程:" + Thread.currentThread().getName()) ;
executor.shutdown() ;

执行结果:

图片

2.2 获取上一步任务执行结果及任务完成处理

CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(3) ;
    System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "1" ;
}, executor).thenApply(res -> {
  System.out.println("获取到上一步任务执行结果:" + res) ;
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "2" ;
}).whenComplete((res, tx) -> {
  System.out.println("获取到结果:" + res) ;
  if (tx != null) {
    System.err.println("发生错误了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;
System.out.println("主线程:" + Thread.currentThread().getName()) ;

执行结果:

图片

这里如果任务执行的时候发生了异常那么在whenComplete方法中的res 会为空,tx为发生异常的对象。没有异常时res有执行的机构,tx异常对象为空。

2.3 异步任务异常处理

CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(3) ;
    System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "1" ;
}, executor).thenApply(res -> {
  System.out.println("获取到上一步任务执行结果:" + res) ;
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ;
    System.out.println(1 / 0) ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "2" ;
}).exceptionally(tx -> {
  System.out.println(Thread.currentThread().getName() + ", 任务执行发生了异常") ;
  return "error" ;
}).whenComplete((res, tx) -> {
  System.out.println("获取到结果:" + res) ;
  if (tx != null) {
    System.err.println("发生错误了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;
System.out.println("主线程:" + Thread.currentThread().getName()) ;

这里我们人为的制造异常 1 / 0 。

执行结果:

图片

根据执行结果当发生异常时进入exceptionally方法,最终进入whenComplete方法此时 tx异常对象是发生异常的异常对象。

2.4 所有任务完成才算完成任务

CompletableFuture.allOf

CompletableFuture calc1 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", calc1任务执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 10D ;
}, executor) ;


CompletableFuture calc2 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(5) ;
    System.out.println(Thread.currentThread().getName() + ", calc2任务执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 20D ;
}, executor) ;
// 当任何一个任务发生异常,这里的tx都不会为null
CompletableFuture.allOf(calc1, calc2).whenComplete((res, tx) -> {
  System.out.println("获取到结果:" + res + ", " + tx) ;
  try {
    System.out.println(calc1.get()) ;
    System.out.println(calc2.get()) ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  }
  executor.shutdown();
}) ;

执行结果:

图片

在这里whenComplete中的res是没有结果的,要获取数据我们的分别调用get方法获取。

2.5 handle方法对结果处理

CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return "0" ;
}, executor).handle((res, tx) -> {
  // 处理结果数据
  return res + "1" ;
}).whenComplete((res, tx) -> {
  System.out.println("获取到结果:" + res) ;
  if (tx != null) {
    System.err.println("发生错误了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;

执行结果:

正确

图片

发生异常时:

当发生异常时handle方法中的res是没有值的,tx异常对象为发生异常的异常对象。

2.6 合并异步任务

将两个异步任务完成后合并处理

CompletableFuture.thenCombine

CompletableFuture task1 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 10d ;
}, executor) ;
CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 20d ;
}, executor) ;
task1.thenCombine(task2, (t1, t2) -> {
  System.out.println(Thread.currentThread().getName() + ", 合并任务完成") ;
  return t1 + "," + t2 ;
}).whenComplete((res, tx) -> {
  System.out.println("获取到结果:" + res) ;
  if (tx != null) {
    System.err.println("发生错误了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;

执行结果:

图片

2.7 异步任务谁快谁就进入下一步的执行

CompletableFuture.applyToEither

两个异步任务谁先执行完谁就继续执行后续的操作。

CompletableFuture task1 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 10d ;
}, executor) ;
CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 20d ;
}, executor) ;
task1.applyToEither(task2, res -> {
  return res ;
}).whenComplete((res, tx) -> {
  System.out.println("获取到结果:" + res) ;
  if (tx != null) {
    System.err.println("发生错误了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;

执行结果:

图片

2.8 两个异步任务都执行完了才继续执行

只有两个任务都执行完成了后才会继续。

CompletableFuture.runAfterBoth

CompletableFuture task1 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 10d ;
}, executor) ;
CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
  try {
    TimeUnit.SECONDS.sleep(2) ;
    System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ;
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 20d ;
}, executor) ;
task1.runAfterBoth(task2, () -> {
  System.out.println("任务都执行完成了...") ;
}).whenComplete((res, tx) -> {
  System.out.println("获取到结果:" + res) ;
  if (tx != null) {
    System.err.println("发生错误了:" + tx.getMessage()) ;
  }
  executor.shutdown();
}) ;

执行结果:

图片

2.9 任意一个任务执行完成就算完成

CompletableFuture.anyOf

CompletableFuture task1 = CompletableFuture.supplyAsync(() -> {
  sleep(1000) ;
  System.out.println("我是任务1") ;
  return "Task1" ;
}, executor) ;


CompletableFuture task2 = CompletableFuture.supplyAsync(() -> {
  sleep(3000) ;
  System.out.println("我是任务2") ;
  System.out.println(1 / 0) ;
  return "Task2" ;
}, executor) ;
// 任意一个任务执行完成就算完成
// 当任务执行发生异常后,th才不会为null
CompletableFuture.anyOf(task1, task2).whenCompleteAsync((v, th) -> {
  System.out.println("v = " + v) ;
  System.out.println("th = " + th) ;
}, executor) ;

执行结果:

图片

2.10 接收上一个任务的执行结果

CompletableFuture.supplyAsync(() -> {
  sleep(2000) ;
  System.out.println("第一个任务执行完成...") ;
  // System.out.println(1 / 0) ;
  return new Random().nextInt(10000) ;
}, executor).thenAcceptAsync(res -> { // 接收上一个任务的执行结果
  System.out.println("任务执行结果:" + res) ;
}, executor) ;

执行结果:

图片

以上是本篇文章的全部内容,希望对你有帮助。

来源:Spring全家桶实战案例源码内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯