文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

新一代WebFlux框架核心技术Reactor响应式编程基本用法

2024-11-30 04:48

关注

1. 前言

在响应式编程中,Project Reactor提供了两个核心的概念:Mono和Flux。Mono和Flux都是Reactor中的Publisher,它们可以产生并发布数据,然后可以被订阅和消费。这两个概念在WebFlux中有着广泛的应用,帮助我们实现异步和非阻塞的编程模型。

在这个主题中,我们将深入探讨Mono和Flux的基本使用。我们将了解它们如何被创建,如何订阅它们的事件,以及如何处理错误和完成通知。通过学习这些内容,你将能够更好地理解WebFlux的响应式编程模型,并能够在你的项目中有效地使用Mono和Flux。

让我们开始吧!

2. 环境依赖


  io.projectreactor
  reactor-core


  
    
      io.projectreactor
      reactor-bom
      ${reactor.version}
      pom
      import
    
  

3. Mono & Flux介绍

Flux

Flux表示了0到N个元素序列,下图展示了Flux如何转换元素

Flux

一个Flux是一个标准的Publisher,它表示一个由0到N个发射项目组成的异步序列,可选地由一个完成信号或一个错误终止。在响应式流规范中,这三种类型的信号转换为对下游订阅者的onNext、onComplete和onError方法的调用。

由于可能信号的范围很大,Flux是通用的反应式类型。请注意,所有事件,甚至是终止事件,都是可选的:只有onComplete事件才能表示一个空的有限序列,但删除onComplete事件就会得到一个无限的空序列(没什么用处,除了关于取消的测试)。类似地,无限序列不一定是空的。例如,Flux.interval(Duration)产生一个无限长的Flux,并从时钟发出规则的时标。

Mono

Mono表示了0个或1个元素序列,下图展示了Mono如何转换元素

图片

Mono

Mono是一个专门的发布者,它通过onNext信号发出最多一个项目,然后以onComplete信号终止(Mono成功,有或没有值),或只发出一个onError信号(Mono失败)。

大多数Mono实现都希望在调用onNext之后立即对其订阅者调用onComplete。Mono.never()是一个异常值:它不会发出任何信号,这在技术上并没有被禁止,但在测试之外并不是特别有用。另一方面,onNext和onError的组合是明确禁止的。

Mono只提供了可用于` Flux `的操作符子集,有些操作符(特别是那些将Mono与另一个`Publisher`结合的操作符)会切换到`Flux`。例如,Mono#concatWith(Publisher)返回一个Flux,而Mono#then(Mono)返回另一个Mono。

注意,你可以使用Mono来表示只有完成概念的无值异步进程(类似于Runnable)。要创建一个,可以使用一个空的Mono

4. Mono & Flux常用操作

Mono常用操作

Mono.just(T value)方法:创建一个包含指定值的Mono对象。

Mono.just(10).subscribe(System.out::println) ;

Mono.empty()方法:创建一个空的Mono对象,即不包含任何元素。

Mono.justOrEmpty(T value)方法:如果指定值不为null,则创建一个包含该值的Mono对象;否则创建一个空的Mono对象。

// 输出10
Mono.justOrEmpty(10).subscribe(System.out::println) ;
// 如果值为null,没有任何输出
Mono.justOrEmpty(null).subscribe(System.out::println) ;

图片

Mono.fromCallable(Callable supplier)方法:创建一个Mono对象,该对象包含通过调用给定Callable对象的call()方法得到的返回值。

// 通过Callable方式,我们可以在内部执行其它一些动作
Mono.fromCallable(() -> 666).subscribe(System.out::println) ;

图片

Mono.fromSupplier(Supplier supplier)方法:创建一个Mono对象,该对象包含通过调用给定Supplier对象的get()方法得到的返回值。

Mono.fromSupplier(() -> 666).subscribe(System.out::println) ;

图片

Mono.fromFuture(CompletableFuture future)方法:创建一个Mono对象,该对象包含通过调用给定CompletableFuture对象

Mono.fromFuture(CompletableFuture.supplyAsync(() -> 666)).subscribe(System.out::println) ;

图片

下面这个示例完整的展示了当发生异常后的处理

public static Mono invoke(Mono user) {
  return user.flatMap(u -> {
    if ("admin".equals(u.getName())) {
      return Mono.error(new RuntimeException("越权")) ;
    }
    u.setName(u.getName() + " - ");
    return Mono.just(u) ;
  });
}


public static void main(String[] args) {
  invoke(Mono.just(new Users("admin")))
    .doOnNext(System.out::println)
    .doOnError(e -> {
      System.out.println(e.getMessage()) ;
    })
    // .onErrorResume(e -> Mono.just(new Users(e.getMessage() + " - fallback"))) // 功能更强,可以对捕获的异常进行响应的处理,然后再返回一个值
    .onErrorReturn(new Users("return")) // 捕获异常,简单粗暴直接返回一个静态值
    .doOnNext(System.out::println)
    .subscribe(); 


}
越权
Users [name=return]

图片

图片

将该Mono的发射与提供的发布者连接(不交错)。

Mono.just(10).concatWith(Mono.just(20)).subscribe(System.out::println) ;

图片

该操作符是在当前Mono执行完成后切换到另外一个Mono。

Mono.just(10).doOnNext(System.out::println)
  .then(Mono.just(666)) // 切换到另外一个Mono通道, 忽略之前的Mono元素
  .doOnNext(System.out::println)
  .subscribe();

图片

Flux常用操作

just():直接使用元素创建Flux,即在创建Flux时拿到数据,之后有谁订阅它,就重新发送数据给订阅者。

Flux.just(1, 2, 3...)

图片

fromArray()、fromIterable()和fromStream():可以从数组、Iterable对象或Stream对象中创建Flux对象。

Flux.fromArray(new String[]{"1","2","3"});
Flux.fromIterable(List.of("a","b","c"));
Flux.fromStream(List.of("a","b","c").stream());

fromArray

图片

fromIterable

图片

fromStream

empty():创建一个不包含任何元素,只发布结束消息的序列。

图片

range(int start, int count):创建包含从start起始的count个数量的Integer对象的序列。

Flux.range(1, 10) ;

图片

Flux.error(new RuntimeException("错误")).onErrorResume(ex -> Mono.just("发生异常:" + ex.getMessage())).subscribe(System.out::println) ;

error操作符

图片

onErrorResume操作符

图片

Flux.just(1, 3, 6).flatMap(id -> {
  Mono query = Mono.fromSupplier(() -> {
    System.out.println("查询数据...") ;
    return id * 10 ;
  }).delayElement(Duration.ofSeconds(2)) ;
  Mono save = Mono.fromSupplier(() -> {
    System.out.println("保存数据...") ;
    return "success - " + id ;
  }) ;
  return Mono.when(query, save) ;
}).doOnComplete(() -> {
  System.out.println("执行完成...") ;
}).subscribe() ;

图片

Flux.just(1,2,3,4,5,6).concatMap(item -> Mono.just(item).filterWhen(r -> {
  return Mono.just(r % 2 == 0) ;
})).subscribe(System.out::println) ;

图片

总之,Reactor中的Flux和Mono是响应式编程的核心组件,它们提供了丰富的操作符和方法来处理异步数据流。因此,对于使用WebFlux的开发者来说,掌握Reactor的使用是非常重要的。

完毕!!!

来源:Spring全家桶实战案例源码内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯