环境:Springboot2.4.11
概述
为什么创建了SpringWebFlux?
部分答案是需要一个非阻塞web堆栈来处理具有少量线程的并发性,并使用较少的硬件资源进行扩展。Servlet 3.1确实为非阻塞I/O提供了一个API。但是,使用它会导致Servlet API的其他部分出现偏差。这是一个新的公共API在任何非阻塞运行时作为基础的动机。这一点很重要,因为服务器(如Netty)在异步、非阻塞空间中建立良好。
答案的另一部分是函数式编程。正如Java 5中添加注释创造了机会(如带注释的REST控制器或单元测试),Java 8中添加lambda表达式也为Java中的函数API创造了机会。这对于允许异步逻辑的声明性组合的非阻塞应用程序和延续式API(CompletableFuture和ReactiveX推广)是一个福音。在编程模型级别,Java8支持SpringWebFlux提供功能性web端点和带注释的控制器。
什么是反应式编程?
“反应式”指的是围绕对变化作出反应而构建的编程模型 — 响应I/O事件的网络组件、响应鼠标事件的UI控制器以及其他组件。非阻塞是反应性的,因为我们现在不是被阻塞,而是在操作完成或数据可用时对通知作出反应。还有另一个重要的机制,我们Spring团队将其与“反应性”联系起来,那就是无阻塞背压。在同步命令式代码中,阻塞调用作为一种自然形式的背压,迫使调用方等待。在非阻塞代码中,控制事件的速率变得非常重要,以便快速生产者不会压倒其目的地。
Reactive Streams 是一个小规范(在Java9中也采用),它定义了具有背压的异步组件之间的交互。例如,数据存储库(充当发布者)可以生成HTTP服务器(充当订户)可以写入响应的数据。反应流的主要目的是让订阅者控制发布者生成数据的速度。
Reactive Streams 在互操作性方面起着重要作用。它对库和基础结构组件很感兴趣,但作为应用程序API不太有用,因为它的级别太低。应用程序需要更高级别、更丰富、功能更强大的API来组成异步逻辑 — 与Java8流API类似,但不仅仅针对集合。这就是反应式库所扮演的角色。
Reactor是Spring WebFlux的首选反应库。它提供了Mono和Flux API类型,通过与ReactiveX操作符词汇表对齐的一组丰富的操作符来处理0..1(Mono)和0..N(Flux)的数据序列。反应器是一个反应流库,因此,其所有操作员都支持非阻塞背压。Reactor非常关注服务器端Java。它是与Spring密切合作开发的。
WebFlux需要Reactor作为核心依赖项,但它可以通过反应流与其他反应库进行互操作。作为一般规则,WebFlux API接受普通发布服务器作为输入,在内部将其调整为反应器类型,使用该类型,并返回Flux或Mono作为输出。
编程模型
Spring Web模块包含了Spring WebFlux的基础,包括HTTP抽象、支持服务器的反应流适配器、编解码器和与Servlet API相媲美的核心WebHandler-API,但是具有非阻塞契约。
在此基础上,Spring WebFlux提供了两种编程模型的选择:
- Annotated Controllers:与SpringMVC一致,并基于SpringWeb模块中相同的注释。SpringMVC和WebFlux控制器都支持反应式(Reactor和RxJava)返回类型,因此很难区分它们。一个显著的区别是WebFlux还支持反应式@RequestBody参数。
- Functional Endpoints: 基于Lambda的轻量级功能性编程模型。您可以将其视为一个小型库或一组应用程序可用于路由和处理请求的实用程序。带注释控制器的最大区别在于,应用程序从头到尾负责请求处理,而不是通过注释声明意图并被回调。
适用性
Spring MVC or WebFlux如何选择?
它们可以并排使用,并且来自各方的反馈对双方都有利。下图显示了两者之间的关系、它们的共同点以及各自唯一支持的内容:
我们建议你考虑以下几点:
- 如果你有一个运行良好的SpringMVC应用程序,则无需进行更改。命令式编程是编写、理解和调试代码的最简单方法。您可以选择最多的库,因为从历史上看,大多数库都是阻塞的。
- 如果你已经在购买非阻塞web堆栈,Spring WebFlux提供了与此领域其他产品相同的执行模型优势,还提供了服务器选择(Netty、Tomcat、Jetty、Undertow和Servlet 3.1+容器)、编程模型选择(annotated controllers and functional web endpoints),以及可选择的反应库(Reactor、RxJava或其他)。
- 如果你对用于Java8 Lambdas或Kotlin的轻量级功能性web框架感兴趣,可以使用SpringWebFlux功能性web端点。对于需求不太复杂的小型应用程序或微服务来说,这也是一个不错的选择,因为它们可以从更高的透明度和控制中获益。
- 在微服务体系结构中,可以混合使用具有SpringMVC或SpringWebFlux控制器或SpringWebFlux功能端点的应用程序。在两个框架中都支持相同的基于注释的编程模型,这使得重用知识更加容易,同时也为正确的工作选择了正确的工具。
- 评估应用程序的一种简单方法是检查其依赖性。如果你有块持久性API(JPA、JDBC)或网络API可供使用,SpringMVC至少是通用体系结构的最佳选择。对于Reactor和RxJava来说,在单独的线程上执行阻塞调用在技术上是可行的,但是你不会充分利用非阻塞web堆栈。
- 如果你有一个SpringMVC应用程序,可以调用远程服务,请尝试使用反应式WebClient。你可以直接从SpringMVC控制器方法返回反应类型(Reactor、RxJava或其他)。每次调用的延迟或调用之间的相互依赖性越大,好处就越显著。SpringMVC控制器也可以调用其他反应组件。
- 如果你有一个庞大的团队,请记住,在向非阻塞、函数式和声明式编程转变的过程中,学习曲线很陡峭。在没有完全转换的情况下启动的一种实用方法是使用反应式WebClient。除此之外,从小事做起,衡量效益。我们预计,对于广泛的应用,这种转变是不必要的。如果您不确定要寻找哪些好处,请从了解非阻塞I/O的工作原理(例如,单线程Node.js上的并发)及其效果开始。
应用服务
Spring WebFlux在Tomcat、Jetty、Servlet3.1+容器以及Netty和Undertow等非Servlet运行时上受支持。所有服务器都适用于低级别的通用API,以便跨服务器支持更高级别的编程模型。
Spring WebFlux没有启动或停止服务器的内置支持。然而,从Spring配置和WebFlux基础设施组装应用程序并用几行代码运行它是很容易的。
Spring Boot有一个WebFlux启动器,可以自动执行这些步骤。默认情况下,初学者使用Netty,但通过更改Maven或Gradle依赖项,可以很容易地切换到Tomcat、Jetty或Undertow。springboot默认为Netty,因为它更广泛地用于异步、无阻塞,并允许客户端和服务器共享资源。
Tomcat和Jetty可以与Spring MVC和WebFlux一起使用。但是,请记住,它们的使用方式是非常不同的。SpringMVC依赖于Servlet阻塞I/O,并允许应用程序在需要时直接使用Servlet API。Spring WebFlux依赖于Servlet3.1非阻塞I/O,并在低级适配器后面使用ServletAPI。它不暴露直接使用。
对于Undertow,Spring WebFlux直接使用Undertow API,而不使用ServletAPI。
性能
性能有许多特点和意义。响应式和非阻塞通常不会使应用程序运行得更快。在某些情况下,它们可以(例如,如果使用WebClient并行运行远程调用)。总的来说,它需要更多的工作来完成非阻塞方式的事情,这可以稍微增加所需的处理时间。
反应式和非阻塞的主要预期好处是能够用少量固定数量的线程和更少的内存进行扩展。这使得应用程序在负载下更具弹性,因为它们以更可预测的方式扩展。然而,为了观察这些好处,你需要有一些延迟(包括缓慢和不可预测的网络I/O混合)。这就是反应堆栈开始显示其优势的地方,而差异可能是巨大的。
线程模型
在SpringMVC(以及一般的servlet应用程序)中,假定应用程序可以阻止当前线程(例如,远程调用)。由于这个原因,servlet容器使用一个大的线程池来吸收请求处理期间的潜在阻塞。
在Spring WebFlux(以及一般的非阻塞服务器)中,假定应用程序不阻塞。因此,非阻塞服务器使用一个小的、固定大小的线程池(事件循环工作者)来处理请求。
- 调用阻塞API
如果你确实需要使用阻塞库怎么办?Reactor和RxJava都提供publishOn操作符,以便在不同的线程上继续处理。这意味着有一个容易逃生的舱口。但是,请记住,阻塞API并不适合此并发模型。
- 易变状态
在Reactor和RxJava中,通过操作符声明逻辑。在运行时,会形成一个反应式管道,在该管道中,数据会在不同的阶段按顺序处理。这样做的一个关键好处是,它使应用程序不必保护可变状态,因为该管道中的应用程序代码永远不会被并发调用。
- 线程模型
在运行Spring WebFlux的服务器上,你希望看到哪些线程?
在“普通”Spring WebFlux服务器上(例如,没有数据访问或其他可选依赖项),您可以期望服务器有一个线程,其他几个线程用于请求处理(通常与CPU核数相同)。然而,Servlet容器可以从更多线程开始(例如,Tomcat上的10个线程),以支持Servlet(阻塞)I/O和Servlet 3.1(非阻塞)I/O使用。
反应式WebClient以事件循环方式运行。因此,您可以看到与此相关的少量固定数量的处理线程(例如,reactor http nio-带有reactor Netty连接器)。但是,如果Reactor Netty同时用于客户端和服务器,则默认情况下,这两个服务器共享事件循环资源。
Reactor和RxJava提供了线程池抽象,称为调度程序,与publishOn操作符一起使用,publishOn操作符用于将处理切换到不同的线程池。调度程序的名称表示特定的并发策略 — 例如,“并行”(用于CPU绑定的、线程数量有限的工作)或“弹性”(用于I/O绑定的、线程数量较多的工作)。如果你看到这样的线程,则意味着某些代码正在使用特定的线程池调度程序策略。
数据访问库和其他第三方依赖项也可以创建和使用自己的线程。
引入依赖
-
org.springframework.boot -
spring-boot-starter-webflux -
- --R2DBC是基于Reactive Streams标准来设计的。通过使用R2DBC,你可以使用reactive API来操作数据。同时R2DBC只是一个开放的标准,而各个具体的数据库连接实现,需要实现这个标准。-->
-
org.springframework.boot -
spring-boot-starter-data-r2dbc -
- --响应式编程传统的jdbc操作是阻塞式的,所以不能再用以前的mysql驱动了-->
-
dev.miku -
r2dbc-mysql -
相关配置
- spring:
- r2dbc:
- #连接数据库的url,前缀不再是jdbc而是换成r2dbc
- #这里可以配置连接池相关的其它属性,这里为了简洁不配置
- url: r2dbc:mysql://localhost:3306/testjpa
- username: root
- password: 123123
PO定义
- @Table(value = "reactive_users")
- public class Users implements Serializable {
- @Id
- private Long id ;
- private Integer age ;
- private String name ;
- }
DAO层定义
- // 在使用JPA时经常用到的是JpaRepository等;在反应式编程中不能在使用了,只能用如下的接口
- public interface UsersRepository extends ReactiveCrudRepository
, ReactiveSortingRepository{ - // 这里的方法名定义还是与使用data-jpa时一样的定义
- public Mono
findByName(String name) ; - }
Service层定义
- @Service
- public class UsersService {
-
- @Resource
- private UsersRepository usersRepository ;
-
- @Transactional
- public Mono
save(Users users) { - return usersRepository.save(users) ;
- }
-
- public Mono
getUsers(Long id) { - return usersRepository.findById(id) ;
- }
-
- public Flux
list() { - return usersRepository.findAll() ;
- }
-
- public Mono
getUsersByName(String name) { - return usersRepository.findByName(name) ;
- }
-
- }
Service中定义了CURD操作。
Controller接口定义
- @RestController
- @RequestMapping("/users")
- public class UsersController {
-
- @Resource
- private UsersService usersService ;
-
- @PostMapping("/save")
- public Mono
save(@RequestBody Users user) { - Mono
res = usersService.save(user) ; - return res.flatMap(new Function
>() { - @Override
- public Mono
apply(Users t) { - return Mono.just(t.getId()) ;
- }
- }) ;
- }
-
- @GetMapping("/{id}")
- public Mono
getUsers(@PathVariable( "id") Long id) { - return usersService.getUsers(id) ;
- }
-
- @GetMapping("/lists")
- public Flux
list() { - return usersService.list() ;
- }
-
- @GetMapping("/name")
- public Mono
name(String name) { - return usersService.getUsersByName(name) ;
- }
-
- }
Controller的定义还是与传统的定义方式差不多,只是返回值要么是Mono(单一值),要么是Flux(集合)对象。