什么是响应式编程?
响应式编程是一种编程范式,它关注数据的变化和传播,而不是控制流。响应式编程可以提高程序的性能、弹性和可伸缩性,使程序能够及时响应用户的需求和环境的变化。在本文中,我们将介绍Java中的响应式编程的基本概念、原理和实践。
响应式编程的核心思想是将数据和行为抽象为流(Stream),流可以表示任何异步的事件或值,比如用户输入、网络请求、数据库查询等。流可以被观察(Observable),也就是说,可以有一个或多个观察者(Observer)订阅流,并在流发生变化时接收通知。流还可以被操作(Operator),也就是说,可以对流进行各种转换、过滤、组合等操作,从而生成新的流。
响应式编程的优势:
- 可以将复杂的异步逻辑简化为声明式的数据流操作,避免了回调地狱(Callback Hell)、阻塞线程和竞态条件等问题。
- 可以提高程序的性能和资源利用率,通过减少线程的上下文切换和阻塞,以及利用反应流的背压(Backpressure)机制来控制数据流的速度,也就是说,可以让下游的观察者控制上游的数据源的发送速率,从而防止数据溢出或浪费。
- 可以提高程序的表达力和灵活性,通过使用函数式编程的风格和操作符来组合和转换数据流,以及利用反应式编程框架和库来简化异步和事件驱动编程的复杂度
响应式编程的缺点:
- 降低程序的可读性和维护性,通过使用嵌套、回调、订阅等方式来处理异步事件,以及使用反应流的操作符来处理数据流,可能导致代码难以理解和调试。
Java中的响应式编程
Java中有多种框架和库可以实现响应式编程,比如RxJava、Spring Reactor、Vert.x等。这些框架和库都遵循了Reactive Streams规范,这是一套定义了非阻塞背压的异步流处理标准的接口。
Reactive Streams规范主要包括四个接口:
- Publisher:发布者,表示一个数据源,可以发出零个或多个数据,并通知订阅者完成或出错。
- Subscriber:订阅者,表示一个数据消费者,可以订阅一个发布者,并在收到数据、完成或出错时做出相应的动作。
- Subscription:订阅,表示发布者和订阅者之间的关系,可以用来请求或取消数据。
- Processor:处理器,表示一个既是发布者又是订阅者的中间组件,可以对数据进行处理或转换。
Project Reactor
Project Reactor是一个完全非阻塞的包含背压支持的响应式编程基石。它是Spring生态系统中Spring Reactive的基础,被用于如Spring WebFlux, Spring Data和Spring Cloud Gateway等项目中。
基本概念
Project Reactor的核心思想是将数据和事件看作是流(stream),流可以被创建,转换,过滤,合并,分组,缓冲,错误处理等等。流是惰性的,只有当有订阅者(subscriber)订阅时才会开始发射数据或事件。流可以是有限的,也可以是无限的,可以是同步的,也可以是异步的,可以是单线程的,也可以是多线程的。流还可以支持背压(backpressure),即订阅者可以控制流的速度,避免被过多的数据或事件淹没。
核心组件
Project Reactor提供了两个主要的接口来表示流:
- Flux: 表示一个包含0到N个元素的流
- Mono: 表示一个包含0到1个元素的流。
它们都是Publisher<T>的实现,可以发出0-N个元素的异步序列,并根据订阅者的需求推送元素。
Flux表示的是包含0到N个元素的异步序列,可以被onComplete信号或者onError信号所终止。
Mono表示的是包含0或1个元素的异步序列,也可以被onComplete信号或者onError信号所终止。
Flux和Mono之间可以进行转换。
代码示例
Mono
// 创建一个Mono对象,包含一个字符串元素
Mono<String> mono = Mono.just("Hello World");
// 订阅这个Mono对象,并打印元素值
mono.subscribe(System.out::println);
使用Mono.just方法创建了一个包含一个字符串元素的Mono对象,然后使用subscribe方法订阅了这个对象,并提供了一个回调函数来打印元素值。当Mono对象发出元素值时,回调函数就会被调用。
Mono to Flux
把Mono转换成Flux的一种方法是使用flux()方法,它会返回一个包含Mono发出的元素的Flux,或者如果Mono为空,则返回一个空的Flux。例如:
// 创建一个Mono对象,包含一个整数元素
Mono<Integer> mono = Mono.just(1);
// 使用flux()方法把Mono转换成Flux
Flux<Integer> flux = mono.flux();
// 订阅这个Flux对象,并打印元素值
flux.subscribe(System.out::println); // 输出1
另一种方法是使用concatWith()方法,它会将Mono与另一个Publisher连接起来,形成一个Flux。例如:
// 创建一个Mono对象,包含一个整数元素
Mono<Integer> mono = Mono.just(1);
// 创建一个Flux对象,包含两个整数元素
Flux<Integer> flux = Flux.just(2, 3);
// 使用concatWith()方法把Mono和Flux连接起来
Flux<Integer> result = mono.concatWith(flux);
// 订阅这个Flux对象,并打印元素值
result.subscribe(System.out::println); // 输出1, 2, 3
Mono常用的操作
// 从一个固定的值创建一个Mono
Mono.just("Hello").subscribe(System.out::println); // 输出Hello
// 从一个Callable对象创建一个Mono
Callable<String> callable = () -> "World";
Mono.fromCallable(callable).subscribe(System.out::println); // 输出World
// 从一个Supplier对象创建一个Mono
Supplier<String> supplier = () -> "Supplier!";
Mono.fromSupplier(supplier).subscribe(System.out::println); // 输出Supplier!
// 对Mono发出的元素进行映射操作
Mono.just("Hello").map(s -> s + " World").subscribe(System.out::println); // 输出Hello World
// 对Mono发出的元素进行扁平化操作
Mono.just("Hello")
.flatMap(s -> Mono.just(s + " World"))
.subscribe(System.out::println); // 输出Hello World
// 对Mono发出的元素进行过滤操作
Mono.just(1).filter(i -> i > 0).subscribe(System.out::println); // 输出1
// 将多个Mono合并为一个Mono
Mono.zip(Mono.just("Hello"), Mono.just("World"))
.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 输出Hello World
// 将多个Mono合并为一个Flux
Mono.just("Hello")
.mergeWith(Mono.just("World"))
.subscribe(System.out::println); // 输出Hello World
// 在这个Mono完成后,继续处理另一个发布者
Mono.just("Hello").then(Mono.just("World")).subscribe(System.out::println); // 输出World
// 在这个Mono发出元素时,执行一个副作用操作
Mono.just("Hello")
.doOnNext(s -> System.out.println("Before: " + s))
.map(s -> s + " World")
.doOnNext(s -> System.out.println("After: " + s))
.subscribe();
// 输出
// Before: Hello
// After: Hello World
Flux
// 创建一个Flux对象,包含三个整数元素
Flux<Integer> flux = Flux.just(1, 2, 3);
// 订阅这个Flux对象,并打印元素值
flux.subscribe(System.out::println);
使用Flux.just方法创建了一个包含三个整数元素的Flux对象,然后使用subscribe方法订阅了这个对象,并提供了一个回调函数来打印元素值。当Flux对象发出元素值时,回调函数就会被调用。
Flux to Mono
把Flux转换成Mono的一种方法是使用next()方法,它会返回Flux发出的第一个元素,或者如果Flux为空,则返回一个空的Mono。
例如:
// 创建一个Flux对象,包含三个整数元素
Flux<Integer> flux = Flux.just(1, 2, 3);
// 使用next()方法把Flux转换成Mono
Mono<Integer> mono = flux.next();
// 订阅这个Mono对象,并打印元素值
mono.subscribe(System.out::println); // 输出1
另一种方法是使用collectList()方法,它会把Flux发出的所有元素收集到一个列表中,并返回一个包含这个列表的Mono。
例如:
// 创建一个Flux对象,包含三个整数元素
Flux<Integer> flux = Flux.just(1, 2, 3);
// 使用collectList()方法把Flux转换成Mono
Mono<List<Integer>> mono = flux.collectList();
// 订阅这个Mono对象,并打印元素值
mono.subscribe(System.out::println); // 输出[1, 2, 3]
Flux常用的操作
// 从多个固定的值创建一个Flux
Flux.just("Hello", "World").subscribe(System.out::println); // 输出Hello World
// 从一个数组对象创建一个Flux
String[] array = {"Hello", "World"};
Flux.fromArray(array).subscribe(System.out::println); // 输出Hello World
// 从一个Iterable对象创建一个Flux
List<String> list = Arrays.asList("Hello", "World");
Flux.fromIterable(list).subscribe(System.out::println); // 输出Hello World
// 从一个Stream对象创建一个Flux
Stream<String> stream = Stream.of("Hello", "World");
Flux.fromStream(stream).subscribe(System.out::println); // 输出Hello World
// 创建一个包含指定范围内整数的Flux
Flux.range(1, 5).subscribe(System.out::println); // 输出1 2 3 4 5
// 创建一个按照指定时间间隔从0整数递增的Flux
Duration duration = Duration.ofSeconds(1);
Flux<Long> interval = Flux.interval(duration);
interval.subscribe(System.out::println);
// 使用blockLast阻塞主线程,防止程序立即退出
interval.blockLast();
// 输出结果每秒打印一次
// 0
// 1
// 2
// 3
// 4
// ...
// 对Flux发出的每个元素进行映射操作
Flux.just("Hello", "World").map(s -> s + "!")
.subscribe(System.out::println); // 输出Hello! World!
// 对Flux发出的每个元素进行扁平化操作
Flux.just("Hello", "World")
.flatMap(s -> Flux.just(s + "!"))
.subscribe(System.out::println); //输出Hello! World!
// 对Flux发出的每个元素进行过滤操作
Flux.range(1, 5).filter(i -> i % 2 == 0).subscribe(System.out::println);
// 输出2 4
// 将多个Flux合并为一个Flux
Flux.zip(Flux.just("Hello"), Flux.just("World"))
.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2()));
// 输出Hello World
// 将多个Flux合并为一个Flux
Flux.just("Hello").mergeWith(Flux.just("World")).subscribe(System.out::println);
// 输出Hello World
// 将多个Flux合并为一个Flux
Flux.just("Hello").concatWith(Flux.just("World")).subscribe(System.out::println);
// 输出Hello World
// 将所有元素收集到一个List中
Flux.just("Hello", "World").collectList().subscribe(list -> System.out.println(list)); // 输出[Hello, World]
Flux的zip、mergeWith、concatWith区别
zip、mergeWith和concatWith都是用来将多个Flux合并为一个Flux的操作,但是它们有一些区别:
zip会将多个Flux的元素按照一对一的方式进行合并,形成一个包含元组的Flux,每个元组中包含了每个源Flux的一个元素。如果源Flux的元素个数不一致,那么zip会以最短的Flux为基准,多余的元素会被丢弃。
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<Integer> flux2 = Flux.just(1, 2, 3, 4);
Flux<Tuple2<String, Integer>> flux3 = Flux.zip(flux1, flux2);
flux3.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2()));
// 输出A 1 B 2 C 3
// 4不会输出,因为最短的Flux是flux1,长度是3
mergeWith会将多个Flux的元素按照时间顺序进行合并,形成一个包含所有元素的Flux。如果源Flux的元素有重叠,那么mergeWith会保留所有的元素。
Duration duration1 = Duration.ofMillis(100);
Duration duration2 = Duration.ofMillis(200);
Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i);
Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i);
Flux<String> flux = flux1.mergeWith(flux2);
flux.subscribe(System.out::println);
// 输出A0 B0 A1 B1 A2 A3 B2 A4 B3 A5 ...
concatWith会将多个Flux的元素按照订阅顺序进行合并,形成一个包含所有元素的Flux。如果源Flux的元素有重叠,那么concatWith会保留所有的元素。concatWith会等待上一个源Flux完成后才订阅下一个源Flux。
Duration duration1 = Duration.ofMillis(100);
Duration duration2 = Duration.ofMillis(200);
// 每100ms递增1,打印5次结束
Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i).take(5);
Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i).take(5);
Flux<String> flux3 = flux1.concatWith(flux2);
flux3.subscribe(System.out::println);
// 避免程序立即退出
flux3.blockLast();
// 输出
// A0
// A1
// A2
// A3
// A4
// B0
// B1
// B2
// B3
// B4
可以看到,Project Reactor和Java 8 Stream的用法看起来很像,因为它们都提供了一些函数式编程的方法,用来对数据流进行操作,例如map、filter、reduce等。但是它们的本质是不同的,主要有以下几个区别:
- Project Reactor是基于Reactive Streams规范的一个实现,它支持异步、非阻塞、反应式的编程模式,而Java 8 Stream是基于集合类的一个扩展,它支持同步、阻塞、命令式的编程模式。
- Project Reactor是基于Push模式的,它可以让数据源主动推送数据给订阅者,并且支持背压机制,让订阅者可以控制数据的流速,而Java 8 Stream是基于Pull模式的,它需要订阅者主动拉取数据源的数据,并且没有背压机制,可能会导致内存溢出或者性能下降。
- Project Reactor可以处理无限流或者有限流,它可以通过短路操作来终止无限流,而Java 8 Stream只能处理有限流,它不能处理无限流或者异步流。
- Project Reactor可以在多线程或者单线程环境下运行,它可以通过parallel或者sequential方法来切换并行或者串行模式,而Java 8 Stream只能在单线程环境下运行,它只能通过parallelStream方法来创建并行流。
本文只介绍了Project Reactor的基本概念和用法,包括Flux和Mono的创建、转换、过滤、合并等操作。如果你想深入学习Project Reactor,你可以参考官方文档或者其他相关的资料,更多关于Project Reactor 响应式编程的资料请关注编程网其它相关文章!