文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Project Reactor响应式编程是什么

2023-07-05 20:48

关注

这篇文章主要介绍了Project Reactor响应式编程是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Project Reactor响应式编程是什么文章都会有所收获,下面我们一起来看看吧。

什么是响应式编程?

响应式编程是一种编程范式,它关注数据的变化和传播,而不是控制流。响应式编程可以提高程序的性能、弹性和可伸缩性,使程序能够及时响应用户的需求和环境的变化。在本文中,我们将介绍Java中的响应式编程的基本概念、原理和实践。

响应式编程的核心思想是将数据和行为抽象为流(Stream),流可以表示任何异步的事件或值,比如用户输入、网络请求、数据库查询等。流可以被观察(Observable),也就是说,可以有一个或多个观察者(Observer)订阅流,并在流发生变化时接收通知。流还可以被操作(Operator),也就是说,可以对流进行各种转换、过滤、组合等操作,从而生成新的流。

响应式编程的优势:

响应式编程的缺点:

Java中的响应式编程

Java中有多种框架和库可以实现响应式编程,比如RxJava、Spring Reactor、Vert.x等。这些框架和库都遵循了Reactive Streams规范,这是一套定义了非阻塞背压的异步流处理标准的接口。

Reactive Streams规范主要包括四个接口:

Project Reactor

Project Reactor是一个完全非阻塞的包含背压支持的响应式编程基石。它是Spring生态系统中Spring Reactive的基础,被用于如Spring WebFlux, Spring Data和Spring Cloud Gateway等项目中。

基本概念

Project Reactor的核心思想是将数据和事件看作是流(stream),流可以被创建,转换,过滤,合并,分组,缓冲,错误处理等等。流是惰性的,只有当有订阅者(subscriber)订阅时才会开始发射数据或事件。流可以是有限的,也可以是无限的,可以是同步的,也可以是异步的,可以是单线程的,也可以是多线程的。流还可以支持背压(backpressure),即订阅者可以控制流的速度,避免被过多的数据或事件淹没。

核心组件

Project Reactor提供了两个主要的接口来表示流:

它们都是Publisher<T>的实现,可以发出0-N个元素的异步序列,并根据订阅者的需求推送元素。

Flux表示的是包含0到N个元素的异步序列,可以被onComplete信号或者onError信号所终止。

Mono表示的是包含0或1个元素的异步序列,也可以被onComplete信号或者onError信号所终止。

Flux和Mono之间可以进行转换。

代码示例

Mono

// 创建一个Mono对象,包含一个字符串元素Mono<String> mono = Mono.just("Hello World");// 订阅这个Mono对象,并打印元素值mono.subscribe(System.out::println);

使用Mono.just方法创建了一个包含一个字符串元素的Mono对象,然后使用subscribe方法订阅了这个对象,并提供了一个回调函数来打印元素值。当Mono对象发出元素值时,回调函数就会被调用。

Mono to Flux

把Mono转换成Flux的一种方法是使用flux()方法,它会返回一个包含Mono发出的元素的Flux,或者如果Mono为空,则返回一个空的Flux。例如:

// 创建一个Mono对象,包含一个整数元素Mono<Integer> mono = Mono.just(1);// 使用flux()方法把Mono转换成FluxFlux<Integer> flux = mono.flux();// 订阅这个Flux对象,并打印元素值flux.subscribe(System.out::println); // 输出1

另一种方法是使用concatWith()方法,它会将Mono与另一个Publisher连接起来,形成一个Flux。例如:

// 创建一个Mono对象,包含一个整数元素Mono<Integer> mono = Mono.just(1);// 创建一个Flux对象,包含两个整数元素Flux<Integer> flux = Flux.just(2, 3);// 使用concatWith()方法把Mono和Flux连接起来Flux<Integer> result = mono.concatWith(flux);// 订阅这个Flux对象,并打印元素值result.subscribe(System.out::println); // 输出1, 2, 3

Mono常用的操作

// 从一个固定的值创建一个MonoMono.just("Hello").subscribe(System.out::println); // 输出Hello// 从一个Callable对象创建一个MonoCallable<String> callable = () -> "World";Mono.fromCallable(callable).subscribe(System.out::println); // 输出World// 从一个Supplier对象创建一个MonoSupplier<String> supplier = () -> "Supplier!";Mono.fromSupplier(supplier).subscribe(System.out::println); // 输出Supplier!// 对Mono发出的元素进行映射操作Mono.just("Hello").map(s -> s + " World").subscribe(System.out::println); // 输出Hello World// 对Mono发出的元素进行扁平化操作Mono.just("Hello")    .flatMap(s -> Mono.just(s + " World"))    .subscribe(System.out::println); // 输出Hello World// 对Mono发出的元素进行过滤操作Mono.just(1).filter(i -> i > 0).subscribe(System.out::println); // 输出1// 将多个Mono合并为一个MonoMono.zip(Mono.just("Hello"), Mono.just("World"))    .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 输出Hello World// 将多个Mono合并为一个FluxMono.just("Hello")    .mergeWith(Mono.just("World"))    .subscribe(System.out::println); // 输出Hello World// 在这个Mono完成后,继续处理另一个发布者Mono.just("Hello").then(Mono.just("World")).subscribe(System.out::println); // 输出World// 在这个Mono发出元素时,执行一个副作用操作Mono.just("Hello")    .doOnNext(s -> System.out.println("Before: " + s))    .map(s -> s + " World")    .doOnNext(s -> System.out.println("After: " + s))    .subscribe(); // 输出// Before: Hello // After: Hello World

Flux

// 创建一个Flux对象,包含三个整数元素Flux<Integer> flux = Flux.just(1, 2, 3);// 订阅这个Flux对象,并打印元素值flux.subscribe(System.out::println);

使用Flux.just方法创建了一个包含三个整数元素的Flux对象,然后使用subscribe方法订阅了这个对象,并提供了一个回调函数来打印元素值。当Flux对象发出元素值时,回调函数就会被调用。

Flux to Mono

把Flux转换成Mono的一种方法是使用next()方法,它会返回Flux发出的第一个元素,或者如果Flux为空,则返回一个空的Mono。

例如:

// 创建一个Flux对象,包含三个整数元素Flux<Integer> flux = Flux.just(1, 2, 3);// 使用next()方法把Flux转换成MonoMono<Integer> mono = flux.next();// 订阅这个Mono对象,并打印元素值mono.subscribe(System.out::println); // 输出1

另一种方法是使用collectList()方法,它会把Flux发出的所有元素收集到一个列表中,并返回一个包含这个列表的Mono。

例如:

// 创建一个Flux对象,包含三个整数元素Flux<Integer> flux = Flux.just(1, 2, 3);// 使用collectList()方法把Flux转换成MonoMono<List<Integer>> mono = flux.collectList();// 订阅这个Mono对象,并打印元素值mono.subscribe(System.out::println); // 输出[1, 2, 3]

Flux常用的操作

// 从多个固定的值创建一个FluxFlux.just("Hello", "World").subscribe(System.out::println); // 输出Hello World// 从一个数组对象创建一个FluxString[] array = {"Hello", "World"};Flux.fromArray(array).subscribe(System.out::println); // 输出Hello World// 从一个Iterable对象创建一个FluxList<String> list = Arrays.asList("Hello", "World");Flux.fromIterable(list).subscribe(System.out::println); // 输出Hello World// 从一个Stream对象创建一个FluxStream<String> stream = Stream.of("Hello", "World");Flux.fromStream(stream).subscribe(System.out::println); // 输出Hello World// 创建一个包含指定范围内整数的FluxFlux.range(1, 5).subscribe(System.out::println); // 输出1 2 3 4 5// 创建一个按照指定时间间隔从0整数递增的FluxDuration duration = Duration.ofSeconds(1);Flux<Long> interval = Flux.interval(duration);interval.subscribe(System.out::println);// 使用blockLast阻塞主线程,防止程序立即退出interval.blockLast();// 输出结果每秒打印一次// 0// 1// 2// 3// 4// ...// 对Flux发出的每个元素进行映射操作Flux.just("Hello", "World").map(s -> s + "!")    .subscribe(System.out::println); // 输出Hello! World!// 对Flux发出的每个元素进行扁平化操作Flux.just("Hello", "World")    .flatMap(s -> Flux.just(s + "!"))    .subscribe(System.out::println); //输出Hello! World!// 对Flux发出的每个元素进行过滤操作Flux.range(1, 5).filter(i -> i % 2 == 0).subscribe(System.out::println); // 输出2 4// 将多个Flux合并为一个FluxFlux.zip(Flux.just("Hello"), Flux.just("World"))    .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 输出Hello World// 将多个Flux合并为一个FluxFlux.just("Hello").mergeWith(Flux.just("World")).subscribe(System.out::println); //  输出Hello World// 将多个Flux合并为一个FluxFlux.just("Hello").concatWith(Flux.just("World")).subscribe(System.out::println); // 输出Hello World// 将所有元素收集到一个List中Flux.just("Hello", "World").collectList().subscribe(list -> System.out.println(list)); // 输出[Hello, World]

Flux的zip、mergeWith、concatWith区别

zip、mergeWith和concatWith都是用来将多个Flux合并为一个Flux的操作,但是它们有一些区别:

zip会将多个Flux的元素按照一对一的方式进行合并,形成一个包含元组的Flux,每个元组中包含了每个源Flux的一个元素。如果源Flux的元素个数不一致,那么zip会以最短的Flux为基准,多余的元素会被丢弃。

  Flux<String> flux1 = Flux.just("A", "B", "C");  Flux<Integer> flux2 = Flux.just(1, 2, 3, 4);  Flux<Tuple2<String, Integer>> flux3 = Flux.zip(flux1, flux2);  flux3.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2()));   // 输出A 1 B 2 C 3  // 4不会输出,因为最短的Flux是flux1,长度是3

mergeWith会将多个Flux的元素按照时间顺序进行合并,形成一个包含所有元素的Flux。如果源Flux的元素有重叠,那么mergeWith会保留所有的元素。

  Duration duration1 = Duration.ofMillis(100);  Duration duration2 = Duration.ofMillis(200);  Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i);  Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i);  Flux<String> flux = flux1.mergeWith(flux2);  flux.subscribe(System.out::println);   // 输出A0 B0 A1 B1 A2 A3 B2 A4 B3 A5 ...

concatWith会将多个Flux的元素按照订阅顺序进行合并,形成一个包含所有元素的Flux。如果源Flux的元素有重叠,那么concatWith会保留所有的元素。concatWith会等待上一个源Flux完成后才订阅下一个源Flux。

  Duration duration1 = Duration.ofMillis(100);  Duration duration2 = Duration.ofMillis(200);  // 每100ms递增1,打印5次结束  Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i).take(5);  Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i).take(5);  Flux<String> flux3 = flux1.concatWith(flux2);  flux3.subscribe(System.out::println);  // 避免程序立即退出  flux3.blockLast();  // 输出   // A0  // A1  // A2  // A3  // A4  // B0  // B1  // B2  // B3  // B4

可以看到,Project Reactor和Java 8 Stream的用法看起来很像,因为它们都提供了一些函数式编程的方法,用来对数据流进行操作,例如map、filter、reduce等。但是它们的本质是不同的,主要有以下几个区别:

关于“Project Reactor响应式编程是什么”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Project Reactor响应式编程是什么”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯