艿艿连肝了几个周末,写了一篇贼长的 Spring 响应式 Web 框架 WebFlux!市面第二完整~

点击上方“芋道源码”,选择“设为星标

做积极的人,而不是积极废人!

源码精品专栏

 

摘要: 原创出处 http://www.iocoder.cn/Spring-Boot/WebFlux/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1. 概述
  • 2. 快速入门
  • 3. 测试接口
  • 4. 全局统一返回
  • 5. 全局异常处理
  • 7. Servlet、Filter、Listener
  • 8. Cors 跨域
  • 9. 集成响应式的 MongoDB
  • 10. 集成响应式的 Redis
  • 11. 集成响应式的 Elasticsearch
  • 12. 整合响应式的 JPA
  • 13. 整合响应式的 R2DBC 和事务
  • 14. 其他内容
  • 666. 彩蛋

本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labs 的 lab-27 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

1. 概述

友情提示:Reactive Programming ,翻译为反应式编程,又称为响应式编程。本文,我们统一使用响应式。不过,比较正确的叫法还是反应式。

Spring Framework 5 在 2017 年 9 月份,发布了 GA 通用版本。既然是一个新的大版本,必然带来了非常多的改进,其中比较重要的一点,就是将响应式编程带入了 Spring 生态。又或者说,将响应式编程“真正”带入了 Java 生态之中。

在此之前,相信绝大多数 Java 开发者,对响应式编程的概念是非常模糊的。甚至说,截止到目前 2019 年 11 月份,对于国内的 Java 开发者,也是知之甚少。

对于我们来说,最早看到的就是 Spring5 提供了一个新的 Web 框架,基于响应式编程的 Spring WebFlux 。至此,SpringMVC 在“干掉” Struts 之后,难道要开始进入 Spring 自己的两个 Web 框架的双雄争霸?

实际上,WebFlux 在出来的两年时间里,据艿艿所了解到的情况,鲜有项目从采用 SpringMVC 迁移到 WebFlux ,又或者新项目直接采用 WebFlux 。这又是为什么呢?

艿艿:V2EX 上还有这样一个讨论 《现在有公司在使用 Spring Boot 2.0 的 WebFlux 吗?》 。

响应式编程,对我们现有的编程方式,是一场颠覆,对于框架也是。

  • 在 Spring 提供的框架中,实际并没有全部实现好对响应式编程的支持。例如说,Spring Transaction 事务组件,在 Spring 5.2 M2 版本,才提供了支持响应式编程的 ReactiveTransactionManager 事务管理器。
  • 更不要说,Java 生态常用的框架,例如说 MyBatis、Jedis 等等,都暂未提供响应式编程的支持。

所以,WebFlux 想要能够真正普及到我们的项目中,不仅仅需要 Spring 自己体系中的框架提供对响应式编程的很好的支持,也需要 Java 生态中的框架也要做到如此。例如说:

艿艿:😈 Java 框架存在大量基于 ThreadLocal 线程变量,实现参数的透传,改造的成本,实际是不小的。

当然,即使如此,这也并不妨碍我们来对 WebFlux 进行一个小小的入门。毕竟,响应式编程这把火,终将熊熊燃起,烧死那些异性恋。哈哈哈~

艿艿:下面的会涉及比较多的概念,不想看的胖友,直接跳到 「2. 快速入门」 小节,直接开始 WebFlux 的入门。

1.1 响应式编程

我们先简单来了解下响应式编程的相关姿势,以保证能够看懂 WebFlux 入门的代码示例,哈哈哈~

维基百科对响应式编程定义如下:

FROM https://en.wikipedia.org/wiki/Reactive_programming

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).

反应式编程是一种异步编程范式,它关注数据流和变化的传播。这意味着可以通过使用编程语言轻松地表示静态(如数组)或动态(如事件发射器)数据流。

Spring 官方文档对响应式编程定义如下:

FROM https://docs.spring.io/spring-framework/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html#web-reactive-programming

In plain terms reactive programming is about non-blocking applications that are asynchronous and event-driven and require a small number of threads to scale vertically (i.e. within the JVM) rather than horizontally (i.e. through clustering).

简单地说,响应式编程是关于非阻塞应用程序的,这些应用程序是异步的、事件驱动的,并且需要少量的线程来垂直伸缩(即在 JVM 中),而不是水平伸缩(即通过集群)。

😈 两个看起来都不很易懂。不过如果胖友看过 Netty 框架的介绍,会发现跟 Spring 的描述非常相像。定义如下:

FROM https://www.oschina.net/p/netty

Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

是不是都看到了异步 + 事件驱动。本质上,Netty 也是有基于响应式编程的思想。所以在下文中,我们会看到,可以使用 Netty 作为 WebFlux 的服务器。

哔哔了这么多,艿艿来用简单但不完全精准的语言尝试下。以后端 API 请求的处理来举例子。

  • 在现在主流的编程模型中,请求是被同步阻塞处理完成,返回结果给前端。
  • 在响应式的编程模型中,请求是被作为一个事件丢到线程池中执行,等到执行完毕,异步回调结果给主线程,最后返回给前端。

通过这样的方式,主线程(实际是多个,这里只是方便描述哈)不断接收请求,不负责直接同步阻塞处理,从而避免自身被阻塞。

1.2 Reactor 框架

在 Java 生态中,提供响应式编程的框架主要有 Reactor、RxJava、JDK9 Flow API 。

那么,Spring 会选择哪个框架作为其响应式编程的基础呢?

  • 首先,可以排除 JDK9 Flow API ,因为 Spring5 要支持 JDK8 版本开始。
  • 其次,Reactor 是 Spring 公司 Pivotal(咳咳咳,2019 年竟然被 VMWare 收购了)是开源的框架,所以必然是“强强联合”,嘿嘿。

如果胖友想要了解 Reactor 和 RxJava 的对比,可以看看 《八个层面比较 Java 8, RxJava, Reactor》 文章,挺详细的。

让我们一起来看看 Reactor 官方对自己的介绍:

FROM https://projectreactor.io/

Reactor is a fourth-generation Reactive library for building non-blocking applications on the JVM based on the Reactive Streams Specification

Reactor 是一个第四代响应式编程框架,用于构建非阻塞 JVM 应用程序,基于 Reactive Streams Specification 来实现。

Reactor Operators and Schedulers can sustain high throughput rates on the order of 10's of millions of messages per second.

Reactor 的操作和调度可以提供每秒千万条消息的高吞吐量。

Plus its low memory footprint should go under most of the radars.

再加上它的低内存占用,应该在大多数雷达(radars)之下。咳咳咳,这个 radars 怎么翻译。

简单来说,Reactor 说是一个响应式编程框架,又快又不占用内存的那种。😈

关于 Reactor 的使用,这里艿艿就不过多介绍,感兴趣的胖友,可以看看 《使用 Reactor 进行反应式编程》 文章。如下是对其中的一段内容的节选并修改:

Reactor 有两个非常重要的基本概念:

  • Flux ,表示的是包含 0 到 N 个元素的异步序列。当消息通知产生时,订阅者(Subscriber)中对应的方法 #onNext(t), #onComplete(t)#onError(t) 会被调用。
  • Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。
  • 同时,Flux 和 Mono 之间可以进行转换。例如:
    • 对一个 Flux 序列进行计数操作,得到的结果是一个 Mono<Long> 对象。
    • 把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

😈 其实,可以先暂时简单把 Mono 理解成 Object ,Flux 理解成 List 。嘿嘿~

1.3 Spring WebFlux

Spring 官方文档对 Spring WebFlux 介绍如下:

FROM https://docs.spring.io/spring-framework/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html

Spring Framework 5 includes a new spring-webflux module. The module contains support for reactive HTTP and WebSocket clients as well as for reactive server web applications including REST, HTML browser, and WebSocket style interactions.

Spring Framework 5 提供了一个新的 spring-webflux 模块。该模块包含了:

  • 对响应式支持的 HTTP 和 WebSocket 客户端。
  • 对响应式支持的 Web 服务器,包括 Rest API、HTML 浏览器、WebSocket 等交互方式。

On the server-side WebFlux supports 2 distinct programming models:

  • Annotation-based with @Controller and the other > annotations supported also with Spring MVC
  • Functional, Java 8 lambda style routing and handling

在服务端方面,WebFlux 提供了 2 种编程模型(翻译成使用方式,可能更易懂):

  • 方式一,基于 Annotated Controller 方式实现:基于 @Controller 和 SpringMVC 使用的其它注解。😈 也就是说,我们大体上可以像使用 SpringMVC 的方式,使用 WebFlux 。
  • 方式二,基于函数式编程方式:函数式,Java 8 lambda 表达式风格的路由和处理。😈 可能有点晦涩,晚点我们看了示例就会明白。

Both programming models are executed on the same reactive foundation that adapts non-blocking HTTP runtimes to the Reactive Streams API.

这两个编程模型,都是在同一个响应式基础(foundation)上执行的,该基础将非阻塞 HTTP 运行时(runtime)适配成响应式 API 。😈 简单来说,就是将原有的 API ,使用 Reactor 封装成响应式 API ,让我们开发者使用更加便捷。

The diagram below shows the server-side stack including traditional, Servlet-based Spring MVC on the left from the spring-webmvc module and also the reactive stack on the right from the spring-webflux module.

下图显示了服务端的技术栈,左侧是 spring-webmvc 模块中传统的、基于 Servlet 的 Spring MVC ,右侧是 spring-webflux 模块中的响应式技术栈。

webflux-overview
  • 😈 仔细看第一层的两个框框,分别是上面提到的 WebFlux 的两种编程模型。表达的是 SpringMVC 不支持 Router Functions 方式,而 WebFlux 支持。

WebFlux can run on Servlet containers with support for the Servlet 3.1 Non-Blocking IO API as well as on other async runtimes such as Netty and Undertow.

WebFlux 可以运行在:

  • 支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上
  • 也可以运行在支持异步运行时的,例如说 Netty 或者 Undertow 上

Each runtime is adapted to a reactive ServerHttpRequest and ServerHttpResponse exposing the body of the request and response as Flux, rather than InputStream and OutputStream, with reactive backpressure.

每一个运行时(runtime)适用于将响应式的 ServerHttpRequest 和 ServerHttpResponse 中 request 和 response 的 body 暴露成 Flux<DataBuffer> 对象,而不是 InputStream  和 InputStream  对象,可用于响应式中的背压(backpressure)。😈 这段有点晦涩,简单来说:

  • 对于 Servlet 来说, ServletRequest#getInputStream() 方法,获得请求的主体内容返回的是 InputStream 对象。
  • 对于 WebFlux 来说,ServerHttpRequest#getBody() 方法,获得请求的主体内容返回的是 Flux<DataBuffer> 对象。

REST-style JSON and XML serialization and deserialization is supported on top as a Flux<Object>, and so is HTML view rendering and Server-Sent Events.

REST 风格 API 使用到的 JSON 和 XML 序列化和反序列化,需要提供对 Flux<Object> 的支持。对于 HTML 渲染,和 SSE 也要提供对 Flux<Object> 的支持。

😈 咳咳咳,看完了这一大段,是不是突然有点想捶死艿艿,说的什么 XX 玩样啊!其实,在我们初学 SpringMVC 的时候,也是一脸懵逼的学完。随着我们对 SpringMVC 的日趋熟练,逐步对其提供的组件、原理、源码慢慢熟悉。所以,对于我们来说,WebFlux 乃至响应式编程来说,都是足够新颖的知识,我们要抱着空杯心态,「Stay Hungry, Stay Foolish」 。

如果胖友的时间比较充分,可以选择把 《Spring 文档 —— Web on Reactive Stack》 仔细看看,详尽的介绍了 Spring 在 Web 方面,响应式相关的技术栈。

虽然说上面我们在介绍 WebFlux ,把它搞的很复杂,实际在快速入门使用它,还是非常简单的。下面,开始让我们开始愉快的快速入门下~

艿艿:考虑到艿艿之前已经写了 《芋道 Spring Boot SpringMVC 入门》 文章,所以本文我们提供的示例,尽量覆盖到在 SpringMVC 提到的内容。

当然,很多相似的概念,艿艿也不重复介绍,不然显得我老啰嗦了。

2. 快速入门

示例代码对应仓库:lab-27-webflux-01 。

本小节,我们会使用 spring-boot-starter-webflux 实现 WebFlux 的自动化配置。然后实现用户的增删改查接口。接口列表如下:

请求方法URL功能
GET/users/list查询用户列表
GET/users/get获得指定用户编号的用户
POST/users/add添加用户
POST/users/update更新指定用户编号的用户
POST/users/delete删除指定用户编号的用户

下面,开始遨游~

2.1 引入依赖

pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>lab-27-webflux-01</artifactId>

    <dependencies>
        <!-- 实现对 Spring WebFlux 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>
  • 具体每个依赖的作用,胖友自己认真看下艿艿添加的所有注释噢。

我们使用 IDEA Maven 插件 ,查看下 spring-boot-starter-webflux 依赖中,所引入的依赖。如下图所示:

  • 引入 reactor-core 依赖,使用 Reactor 作为 WebFlux 的响应式框架的基础。
  • 引入 spring-boot-starter-reactor-netty 依赖,使用 Netty 构建 WebFlux 的 Web 服务器。其中 RxNetty 库,是基于 Reactor 的响应式框架的基础之上,提供出 Netty 的响应式 API 。

当然,我们除了使用可以使用其它作为 WebFlux 的 Web 服务器,如下表格:

Server nameServer API usedReactive Streams support
NettyNetty APIReactor Netty
UndertowUndertow APIspring-web: Undertow to Reactive Streams bridge
TomcatServlet 3.1 non-blocking I/O; Tomcat API to read and write ByteBuffers vs byte[]spring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge
JettyServlet 3.1 non-blocking I/O; Jetty API to write ByteBuffers vs byte[]spring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge
Servlet 3.1 containerServlet 3.1 non-blocking I/Ospring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge
  • 当然,也需要基于 Reactor 的响应式框架的基础之上,封装相应的响应式 API 。

可能胖友会有疑惑,为什么 WebFlux 运行在 Servlet 容器上时,需要 Servlet 3.1+ 以上的容器呢?在 Servlet 3.1 规范发布时,它定义了非常重要的特性,Non-blocking I/O 非阻塞 IO ,提供了异步处理请求的支持。我们来详细展开下:

  • 在 Servlet 3.1 规范之前的版本,请求是只能被 Servlet 同步阻塞处理完成,返回结果给前端。
  • 在 Servlet 3.1 规范开始的版本,请求是允许被 Servlet 丢到线程池中处执行,等到执行完毕,异步回调结果给 Servlet ,最后返回给前端。

艿艿:推荐胖友在阅读完本文之后,可以看看 《Servlet 3.0/3.1 中的异步处理》 文章,可以对 WebFlux 有更好的理解。

2.2 Application

创建 Application.java 类,配置 @SpringBootApplication 注解即可。代码如下:

// Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
  • 先暂时不启动项目。等我们添加好 API 接口。

在 「1.3 Spring WebFlux」 小节中,我们提到了 WebFlux 有两种编程模型,分别是:

  • 方式一,基于 Annotated Controller 方式实现
  • 方式二,基于函数式编程方式

我们分别在下面两个小节来看看。

2.3 基于 Annotated Controller 方式实现

cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserController 类。代码如下:

// UserController.java

@RestController
@RequestMapping("/users")
public class UserController {

    /**
     * 查询用户列表
     *
     * @return 用户列表
     */

    @GetMapping("/list")
    public Flux<UserVO> list() {
        // 查询列表
        List<UserVO> result = new ArrayList<>();
        result.add(new UserVO().setId(1).setUsername("yudaoyuanma"));
        result.add(new UserVO().setId(2).setUsername("woshiyutou"));
        result.add(new UserVO().setId(3).setUsername("chifanshuijiao"));
        // 返回列表
        return Flux.fromIterable(result);
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */

    @GetMapping("/get")
    public Mono<UserVO> get(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return Mono.just(user);
    }

    /**
     * 添加用户
     *
     * @param addDTO 添加用户信息 DTO
     * @return 添加成功的用户编号
     */

    @PostMapping("add")
    public Mono<Integer> add(@RequestBody Publisher<UserAddDTO> addDTO) {
        // 插入用户记录,返回编号
        Integer returnId = 1;
        // 返回用户编号
        return Mono.just(returnId);
    }

    /**
     * 更新指定用户编号的用户
     *
     * @param updateDTO 更新用户信息 DTO
     * @return 是否修改成功
     */

    @PostMapping("/update")
    public Mono<Boolean> update(@RequestBody Publisher<UserUpdateDTO> updateDTO) {
        // 更新用户记录
        Boolean success = true;
        // 返回更新是否成功
        return Mono.just(success);
    }

    /**
     * 删除指定用户编号的用户
     *
     * @param id 用户编号
     * @return 是否删除成功
     */

    @PostMapping("/delete"// URL 修改成 /delete ,RequestMethod 改成 DELETE
    public Mono<Boolean> delete(@RequestParam("id") Integer id) {
        // 删除用户记录
        Boolean success = false;
        // 返回是否更新成功
        return Mono.just(success);
    }

}
  • 在类和方法上,我们添加了 @Controller 和 SpringMVC 在使用的 @GetMappingPostMapping 等注解,提供 API 接口,这个和我们在使用 SpringMVC 是一模一样的。
  • dtovo 包下,有 API 使用到的 DTO 和 VO 类。
  • 😈 因为是入门示例,我们会发现代码十分简单,保持淡定。在后文中,我们会提供和 Spring Data JPA、Spring Data MongoDB、Spring Data Redis 等等整合的示例。
  • #list() 方法,我们最终调用 Flux#fromIterable(Iterable<? extends T> it) 方法,将 List 包装成 Flux 对象返回。
  • #get(Integer id) 方法,我们最终调用 Mono#just(T data) 方法,将 UserVO 包装成 Mono 对象返回。
  • #add(Publisher<UserAddDTO> addDTO) 方法,参数为 Publisher 类型,泛型为 UserAddDTO 类型,并且添加了 @RequestBody 注解,从 request 的 Body 中读取参数。注意,此时提交参数需要使用 "application/json" 等 Content-Type 内容类型。
  • #add(...) 方法,也可以使用 application/x-www-form-urlencodedmultipart/form-data 这两个 Content-Type 内容类型,通过 request 的 Form Data 或 Multipart Data 传递参数。代码如下:
    // UserController.java

    /**
     * 添加用户
     *
     * @param addDTO 添加用户信息 DTO
     * @return 添加成功的用户编号
     */

    @PostMapping("add2")
    public Mono<Integer> add(Mono<UserAddDTO> addDTO) {
        // 插入用户记录,返回编号
        Integer returnId = UUID.randomUUID().hashCode();
        // 返回用户编号
        return Mono.just(returnId);
    }
    • 此时,参数为 Mono 类型,泛型为 UserAddDTO 类型。
    • 当然,我们也可以直接使用参数为 UserAddDTO 类型。如果后续需要使用到 Reactor API ,则我们自己主动调用 Mono#just(T data) 方法,封装出 Publisher 对象。😈 注意,Flux 和 Mono 都实现了 Publisher 接口。
    • 可能有胖友不了解 request Form Data、Multipart Data 和 request Body 的差异,可以看看 《HTTP 请求中 request payload 和 formData 区别?》 文章。
    • WebFlux 对于 Form Data ,在 《Web on Reactive Stack —— Spring WebFlux —— Form Data》 有简短说明。
    • WebFlux 对于 Multipart Data  ,在 《Web on Reactive Stack —— Spring WebFlux —— Multipart Data 》 有简短说明。
  • #update(Publisher<UserUpdateDTO> updateDTO) 方法,和 #add(Publisher<UserAddDTO> addDTO) 方法一致,就不重复赘述。
  • #delete(Integer id) 方法,和 #get(Integer id) 方法一致,就不重复赘述。

2.4 基于函数式编程方式

cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserRouter 类。代码如下:

// UserRouter.java

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userListRouterFunction() {
        return RouterFunctions.route(RequestPredicates.GET("/users2/list"),
                new HandlerFunction<ServerResponse>() {

                    @Override
                    public Mono<ServerResponse> handle(ServerRequest request) {
                        // 查询列表
                        List<UserVO> result = new ArrayList<>();
                        result.add(new UserVO().setId(1).setUsername("yudaoyuanma"));
                        result.add(new UserVO().setId(2).setUsername("woshiyutou"));
                        result.add(new UserVO().setId(3).setUsername("chifanshuijiao"));
                        // 返回列表
                        return ServerResponse.ok().bodyValue(result);
                    }

                });
    }

    @Bean
    public RouterFunction<ServerResponse> userGetRouterFunction() {
        return RouterFunctions.route(RequestPredicates.GET("/users2/get"),
                new HandlerFunction<ServerResponse>() {

                    @Override
                    public Mono<ServerResponse> handle(ServerRequest request) {
                        // 获得编号
                        Integer id = request.queryParam("id")
                                .map(s -> StringUtils.isEmpty(s) ? null : Integer.valueOf(s)).get();
                        // 查询用户
                        UserVO user = new UserVO().setId(id).setUsername(UUID.randomUUID().toString());
                        // 返回列表
                        return ServerResponse.ok().bodyValue(user);
                    }

                });
    }

    @Bean
    public RouterFunction<ServerResponse> demoRouterFunction() {
        return route(GET("/users2/demo"), request -> ok().bodyValue("demo"));
    }

}
  • 在类上,添加 @Configuration 注解,保证该类中的 Bean 们,都被扫描到。

  • 在每个方法中,我们都通弄 RouterFunctions#route(RequestPredicate predicate, HandlerFunction<T> handlerFunction) 方法,定义了一条路由。

    • 第一个参数 predicate 参数,是 RequestPredicate 类型,请求谓语,用于匹配请求。可以通过 RequestPredicates 来构建各种条件。
    • 第二个参数 handlerFunction 参数,是 RouterFunction 类型,处理器函数。
  • 每个方法定义的路由,胖友自己看下代码,一眼能看的明白。一般来说,采用第三个方法的写法,更加简洁。注意,需要使用 static import 静态引入,代码如下:

    import static org.springframework.web.reactive.function.server.RequestPredicates.*;
    import static org.springframework.web.reactive.function.server.RouterFunctions.*;
    import static org.springframework.web.reactive.function.server.ServerResponse.*;

一般来说,艿艿更加推荐基于 Annotated Controller 方式实现的编程方式,更符合我们现在的开发习惯,学习成本也相对低一些。同时,和 API 接口文档工具 Swagger 也更容易集成。

😈 有没觉得每个 HandlerFunction 函数,和每个 Servlet 有点像。

更多基于函数式编程方式的示例,可以看看如下两篇文章:

  • 《Introduction to the Functional Web Framework in Spring 5》
  • 《Spring Boot RouterFunction tutorial》

3. 测试接口

示例代码对应仓库:lab-27-webflux-01 。

在开发完接口,我们会进行接口的自测。一般情况下,我们先启动项目,然后使用 Postman、curl、浏览器,手工模拟请求后端 API 接口。

实际上,WebFlux 提供了 Web 测试客户端 WebTestClient 类,方便我们快速测试接口。下面,我们对 UserController 提供的接口,进行下单元测试。也就是说,本小节,我们会继续在 lab-27-webflux-01 示例的基础上修改。

MockMvc 提供了集成测试和单元测试的能力,我们分成 「3.1 集成测试」 和 「3.2 单元测试」 来看。如果胖友对测试这块不太了解,可以看看如下两篇文章:

  • 《小谈 Java 单元测试》
  • 《谈谈单元测试》

3.1 集成测试

创建 UserControllerTest 测试类,我们来测试一下简单的 UserController 的每个操作。核心代码如下:

// UserControllerTest.java

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@AutoConfigureWebFlux
@AutoConfigureWebTestClient
public class UserControllerTest {

    @Autowired
    private WebTestClient webClient;

    @Test
    public void testList() {
        webClient.get().uri("/users/list")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("[\n" +
                "    {\n" +
                "        \"id\": 1,\n" +
                "        \"username\": \"yudaoyuanma\"\n" +
                "    },\n" +
                "    {\n" +
                "        \"id\": 2,\n" +
                "        \"username\": \"woshiyutou\"\n" +
                "    },\n" +
                "    {\n" +
                "        \"id\": 3,\n" +
                "        \"username\": \"chifanshuijiao\"\n" +
                "    }\n" +
                "]"); // 响应结果
    }

    @Test
    public void testGet() {
        // 获得指定用户编号的用户
        webClient.get().uri("/users/get?id=1")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("{\n" +
                "    \"id\": 1,\n" +
                "    \"username\": \"username:1\"\n" +
                "}"); // 响应结果
    }

    @Test
    public void testGet2() {
        // 获得指定用户编号的用户
        webClient.get().uri("/users/v2/get?id=1")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("{\n" +
                "    \"id\": 1,\n" +
                "    \"username\": \"test\"\n" +
                "}"); // 响应结果
    }

    @Test
    public void testAdd() {
        Map<String, Object> params = new HashMap<>();
        params.put("username""yudaoyuanma");
        params.put("password""nicai");
        // 添加用户
        webClient.post().uri("/users/add")
                .bodyValue(params)
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("1"); // 响应结果。因为没有提供 content 的比较,所以只好使用 json 来比较。竟然能通过
    }

    @Test
    public void testAdd2() // 发送文件的测试,可以参考 https://dev.to/shavz/sending-multipart-form-data-using-spring-webtestclient-2gb7 文章
        BodyInserters.FormInserter<String> formData = // Form Data 数据,需要这么拼凑
                BodyInserters.fromFormData("username""yudaoyuanma")
                .with("password""nicai");
        // 添加用户
        webClient.post().uri("/users/add2")
                .body(formData)
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("1"); // 响应结果。因为没有提供 content 的比较,所以只好使用 json 来比较。竟然能通过
    }


    @Test
    public void testUpdate() {
        Map<String, Object> params = new HashMap<>();
        params.put("id"1);
        params.put("username""yudaoyuanma");
        // 修改用户
        webClient.post().uri("/users/update")
                .bodyValue(params)
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody(Boolean.class) // 期望返回值类型是 Boolean
                .consumeWith((Consumer<EntityExchangeResult<Boolean>>) result -> // 通过消费结果,判断符合是 true 。
                        Assert.assertTrue("返回结果需要为 true", result.getResponseBody()));
    }

    @Test
    public void testDelete() {
        // 删除用户
        webClient.post().uri("/users/delete?id=1")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody(Boolean.class) // 期望返回值类型是 Boolean
                .isEqualTo(true); // 这样更加简洁一些
//                .consumeWith((Consumer<EntityExchangeResult<Boolean>>) result -> // 通过消费结果,判断符合是 true 。
//                        Assert.assertTrue("返回结果需要为 true", result.getResponseBody()));
    }

}
  • 在类上,我们添加了 @AutoConfigureWebTestClient 注解,用于自动化配置我们稍后注入的 WebTestClient Bean 对象 webClient 。在后续的测试中,我们会看到都是通过 webClient 调用后端 API 接口。而每一次调用后端 API 接口,都会执行真正的后端逻辑。因此,整个逻辑,走的是集成测试,会启动一个真实的 Spring 环境。
  • 每次 API 接口的请求,都通过 RequestHeadersSpec 来构建。构建完成后,通过 RequestHeadersSpec#exchange() 方法来执行请求,返回 ResponseSpec 结果。
    • WebTestClient 的 #get()#head()#delete()#options() 方法,返回的是 RequestHeadersUriSpec 对象。
    • WebTestClient 的 #post()#put()#delete()#patch() 方法,返回的是 RequestBodyUriSpec 对象。
    • RequestHeadersUriSpec 和 RequestBodyUriSpec 都继承了 RequestHeadersSpec 接口。
  • 执行完请求后,通过调用 RequestBodyUriSpec 的各种断言方法,添加对结果的预期,相当于做断言。如果不符合预期,则会抛出异常,测试不通过。

3.2 单元测试

为了更好的展示 WebFlux 单元测试的示例,我们需要改写 UserController 的代码,让其会依赖 UserService 。修改点如下:

  • cn.iocoder.springboot.lab27.springwebflux.service 包路径下,创建 UserService 类。代码如下:

    // UserService.java

    @Service
    public class UserService {

        public UserVO get(Integer id) {
            return new UserVO().setId(id).setUsername("test");
        }

    }
  • 在 UserController 类中,增加 GET /users/v2/get 接口,获得指定用户编号的用户。代码如下:

    // UserController.java

    @Autowired
    private UserService userService;

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */

    @GetMapping("/v2/get")
    public Mono<UserVO> get2(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = userService.get(id);
        // 返回
        return Mono.just(user);
    }
    • 在代码中,我们注入了 UserService Bean 对象 userService ,然后在新增的接口方法中,会调用 UserService#get(Integer id) 方法,获得指定用户编号的用户。

创建 UserControllerTest2 测试类,我们来测试一下简单的 UserController 的新增的这个 API 操作。代码如下:

// UserControllerTest2.java

@RunWith(SpringRunner.class)
@WebFluxTest(UserController.class)
public class UserControllerTest2 {

    @Autowired
    private WebTestClient webClient;

    @MockBean
    private UserService userService;

    @Test
    public void testGet2() throws Exception {
        // Mock UserService 的 get 方法
        System.out.println("before mock:" + userService.get(1)); // <1.1>
        Mockito.when(userService.get(1)).thenReturn(
                new UserVO().setId(1).setUsername("username:1")); // <1.2>
        System.out.println("after mock:" + userService.get(1)); // <1.3>

        // 查询用户列表
        webClient.get().uri("/users/v2/get?id=1")
                .exchange() // 执行请求
                .expectStatus().isOk() // 响应状态码 200
                .expectBody().json("{\n" +
                "    \"id\": 1,\n" +
                "    \"username\": \"username:1\"\n" +
                "}"); // 响应结果
    }

}
  • 在类上添加 @WebFluxTest 注解,并且传入的是 UserController 类,表示我们要对 UserController 进行单元测试。
  • 同时,@WebFluxTest 注解,是包含了 @UserController 的组合注解,所以它会自动化配置我们稍后注入的 WebTestClient Bean 对象 mvc 。在后续的测试中,我们会看到都是通过 webClient 调用后端 API 接口。但是!每一次调用后端 API 接口,并不会执行真正的后端逻辑,而是走的 Mock 逻辑。也就是说,整个逻辑,走的是单元测试会启动一个 Mock 的 Spring 环境。

艿艿:注意上面每个加粗的地方!

  • userService 属性,我们添加了 @MockBean 注解,实际这里注入的是一个使用 Mockito 创建的 UserService Mock 代理对象。如下图所示:
    • 打印的就是我们 Mock 返回的 UserVO 对象。
    • 结果竟然返回的是 null 空。理论来说,此时应该返回一个 id = 1 的 UserVO 对象。实际上,因为此时的 userService 是通过 Mockito 来 Mock 出来的对象,其所有调用它的方法,返回的都是空。
    • UserController 中,也会注入一个 UserService 属性,此时注入的就是该 Mock 出来的 UserService Bean 对象。

    • 默认情况下,

    • <1.1> 处,我们调用 UserService#get(Integer id) 方法,然后打印返回结果。执行结果如下:

      before mock:null
    • <1.2> 处,通过 Mockito 进行 Mock userService#get(Integer id) 方法,当传入的 id = 1 方法参数时,返回 id = 1 并且 username = "username:1" 的 UserVO 对象。

    • <1.3> 处,再次调用 UserService#get(Integer id) 方法,然后打印返回结果。执行结果如下:

      after cn.iocoder.springboot.lab27.springwebflux.vo.UserVO@23202c31
  • 后续,使用 webClient 完成一次后端 API 调用,并进行断言结果是否正确。执行成功,单元测试通过。

可能胖友对单元测试不是很了解,这里在额外推荐一本书 《有效的单元测试》 。很薄,周末抽几个小时就能读完。

如果觉得本小节还不够,可以看看 《SpringBoot WebFlux Test – @WebFluxTest》 文章,写的还是不错的。

4. 全局统一返回

示例代码对应仓库:lab-27-webflux-02 。

在我们提供后端 API 给前端时,我们需要告前端,这个 API 调用结果是否成功:

  • 如果成功,成功的数据是什么。后续,前端会取数据渲染到页面上。
  • 如果失败,失败的原因是什么。一般,前端会将原因弹出提示给用户。

这样,我们就需要有统一的返回结果,而不能是每个接口自己定义自己的风格。一般来说,统一的全局返回信息如下:

  • 成功时,返回成功的状态码 + 数据
  • 失败时,返回失败的状态码 + 错误提示

在标准的 RESTful API 的定义,是推荐使用 HTTP 响应状态码 返回状态码。一般来说,我们实践很少这么去做,主要有如下原因:

  • 业务返回的错误状态码很多,HTTP 响应状态码无法很好的映射。例如说,活动还未开始、订单已取消等等。
  • 国内开发者对 HTTP 响应状态码不是很了解,可能只知道 200、403、404、500 几种常见的。这样,反倒增加学习成本。

所以,实际项目在实践时,我们会将状态码放在 Response Body 响应内容中返回。

在全局统一返回里,我们至少需要定义三个字段:

  • code:状态码。无论是否成功,必须返回。

    关于这一块,也有团队实践时,增加了 success 字段,通过 truefalse 表示成功还是失败。这个看每个团队的习惯吧。艿艿的话,还是偏好基于约定,返回 0 时表示成功。

    • 成功时,状态码为 0 。
    • 失败时,对应业务的错误码。
  • data:数据。成功时,返回该字段。

  • message:错误提示。失败时,返回该字段。

那么,让我们来看两个示例:

// 成功响应
{
    code: 0,
    data: {
        id: 1,
        username: "yudaoyuanma"
    }
}

// 失败响应
{
    code: 233666,
    message: "徐妈太丑了"
}

下面,我们来看一个示例。

艿艿:考虑到不破坏 「2. 快速入门」 和 「3. 测试接口」 提供的示例,我们需要重新弄搭建一个。

4.1 引入依赖

在 「2.2 引入依赖」 一致。

4.2 Application

在 「2.3 Application」 一致。

4.3 CommonResult

cn.iocoder.springboot.lab27.springwebflux.core.vo 包路径,创建 CommonResult 类,用于全局统一返回。代码如下:

// CommonResult.java

public class CommonResult<Timplements Serializable {

    public static Integer CODE_SUCCESS = 0;

    /**
     * 错误码
     */

    private Integer code;
    /**
     * 错误提示
     */

    private String message;
    /**
     * 返回数据
     */

    private T data;

    /**
     * 将传入的 result 对象,转换成另外一个泛型结果的对象
     *
     * 因为 A 方法返回的 CommonResult 对象,不满足调用其的 B 方法的返回,所以需要进行转换。
     *
     * @param result 传入的 result 对象
     * @param <T> 返回的泛型
     * @return 新的 CommonResult 对象
     */

    public static <T> CommonResult<T> error(CommonResult<?> result) {
        return error(result.getCode(), result.getMessage());
    }

    public static <T> CommonResult<T> error(Integer code, String message) {
        Assert.isTrue(!CODE_SUCCESS.equals(code), "code 必须是错误的!");
        CommonResult<T> result = new CommonResult<>();
        result.code = code;
        result.message = message;
        return result;
    }

    public static <T> CommonResult<T> success(T data) {
        CommonResult<T> result = new CommonResult<>();
        result.code = CODE_SUCCESS;
        result.data = data;
        result.message = "";
        return result;
    }

    @JsonIgnore // 忽略,避免 jackson 序列化给前端
    public boolean isSuccess() // 方便判断是否成功
        return CODE_SUCCESS.equals(code);
    }

    @JsonIgnore // 忽略,避免 jackson 序列化给前端
    public boolean isError() // 方便判断是否失败
        return !isSuccess();
    }

    // ... 省略 setting/getting/toString 方法

}
  • 每个字段,胖友自己看相应的注释。

4.4 GlobalResponseBodyHandler

cn.iocoder.springboot.lab27.springwebflux.core.web 包路径,创建 GlobalResponseBodyHandler 类,全局统一返回的处理器。代码如下:

// GlobalResponseBodyHandler.java

public class GlobalResponseBodyHandler extends ResponseBodyResultHandler {

    private static Logger LOGGER = LoggerFactory.getLogger(GlobalResponseBodyHandler.class);

    private static MethodParameter METHOD_PARAMETER_MONO_COMMON_RESULT;

    private static final CommonResult COMMON_RESULT_SUCCESS = CommonResult.success(null);

    static {
        try {
            // <1> 获得 METHOD_PARAMETER_MONO_COMMON_RESULT 。其中 -1 表示 `#methodForParams()` 方法的返回值
            METHOD_PARAMETER_MONO_COMMON_RESULT = new MethodParameter(
                    GlobalResponseBodyHandler.class.getDeclaredMethod("methodForParams"), -1);
        } catch (NoSuchMethodException e) {
            LOGGER.error("[static][获取 METHOD_PARAMETER_MONO_COMMON_RESULT 时,找不都方法");
            throw new RuntimeException(e);
        }
    }

    public GlobalResponseBodyHandler(List<HttpMessageWriter<?>> writers, RequestedContentTypeResolver resolver) {
        super(writers, resolver);
    }

    public GlobalResponseBodyHandler(List<HttpMessageWriter<?>> writers, RequestedContentTypeResolver resolver, ReactiveAdapterRegistry registry) {
        super(writers, resolver, registry);
    }

    @Override
    @SuppressWarnings("unchecked")
    public Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
        Object returnValue = result.getReturnValue();
        Object body;
        // <1.1> 处理返回结果为 Mono 的情况
        if (returnValue instanceof Mono) {
            body = ((Mono<Object>) result.getReturnValue())
                    .map((Function<Object, Object>) GlobalResponseBodyHandler::wrapCommonResult)
                    .defaultIfEmpty(COMMON_RESULT_SUCCESS);
        // <1.2> 处理返回结果为 Flux 的情况
        } else if (returnValue instanceof Flux) {
            body = ((Flux<Object>) result.getReturnValue())
                    .collectList()
                    .map((Function<Object, Object>) GlobalResponseBodyHandler::wrapCommonResult)
                    .defaultIfEmpty(COMMON_RESULT_SUCCESS);
        // <1.3> 处理结果为其它类型
        } else {
            body = wrapCommonResult(returnValue);
        }
        // <2>
        return writeBody(body, METHOD_PARAMETER_MONO_COMMON_RESULT, exchange);
    }

    private static Mono<CommonResult> methodForParams() {
        return null;
    }

    private static CommonResult<?> wrapCommonResult(Object body) {
        // 如果已经是 CommonResult 类型,则直接返回
        if (body instanceof CommonResult) {
            return (CommonResult<?>) body;
        }
        // 如果不是,则包装成 CommonResult 类型
        return CommonResult.success(body);
    }

}
  • 继承 WebFlux 的 ResponseBodyResultHandler 类,因为该类将 Response 的 body 写回给前端。所以,我们通过重写该类的 #handleResult(ServerWebExchange exchange, HandlerResult result) 方法,将返回结果进行使用 CommonResult 包装。
  • <1> 处,获得 METHOD_PARAMETER_MONO_COMMON_RESULT 。其中 -1 表示 #methodForParams() 方法的返回值类型 Mono<CommonResult> 。后续我们在#handleResult(ServerWebExchange exchange, HandlerResult result) 方法中,会使用到 METHOD_PARAMETER_MONO_COMMON_RESULT
  • 重写 #handleResult(ServerWebExchange exchange, HandlerResult result) 方法,将返回结果进行使用 CommonResult 包装。
    • <1.1> 处,处理返回结果为 Mono 的情况。通过调用 Mono#map(Function<? super T, ? extends R> mapper) 方法,将原返回结果,进行包装成 CommonResult<?>
    • <1.2> 处,处理返回结果为 Flux 的情况。先通过调用 Flux#collectList() 方法,将其转换成 Mono<List<T>> 对象,后续就是和 <1.1> 相同的逻辑。
    • <1.3> 处,处理结果为其它类型的情况,直接进行包装成 CommonResult<?>
  • <2> 处,调用父类方法 #writeBody(Object body, MethodParameter bodyParameter, ServerWebExchange exchange) 方法,实现将结果写回给前端。

在思路上,和 SpringMVC 使用 ResponseBodyAdvice + @ControllerAdvice 注解,是一致的。只是说,WebFlux 暂时没有提供这样的方式,所以咱只好通过继承 ResponseBodyResultHandler 类,重写其 #handleResult(ServerWebExchange exchange, HandlerResult result) 方法,将返回结果进行使用 CommonResult 包装。

4.5 WebFluxConfiguration

cn.iocoder.springboot.lab27.springwebflux.config 包路径下,创建 WebFluxConfiguration 配置类。代码如下:

// WebFluxConfiguration.java

@Configuration
public class WebFluxConfiguration {

    @Bean
    public GlobalResponseBodyHandler responseWrapper(ServerCodecConfigurer serverCodecConfigurer,
                                                     RequestedContentTypeResolver requestedContentTypeResolver)
 
{
        return new GlobalResponseBodyHandler(serverCodecConfigurer.getWriters(), requestedContentTypeResolver);
    }

}
  • #responseWrapper(serverCodecConfigurer, requestedContentTypeResolver) 方法中,我们创建了 4.4 GlobalResponseBodyHandler Bean 对象,实现对返回结果的包装。

4.6 UserController

cn.iocoder.springboot.lab27.springwebflux.controller 包路径下,创建 UserController 类。代码如下:

// UserController.java

@RestController
@RequestMapping("/users")
public class UserController {

    /**
     * 查询用户列表
     *
     * @return 用户列表
     */

    @GetMapping("/list")
    public Flux<UserVO> list() {
        // 查询列表
        List<UserVO> result = new ArrayList<>();
        result.add(new UserVO().setId(1).setUsername("yudaoyuanma"));
        result.add(new UserVO().setId(2).setUsername("woshiyutou"));
        result.add(new UserVO().setId(3).setUsername("chifanshuijiao"));
        // 返回列表
        return Flux.fromIterable(result);
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */

    @GetMapping("/get")
    public Mono<UserVO> get(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return Mono.just(user);
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */

    @GetMapping("/get2")
    public Mono<CommonResult<UserVO>> get2(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return Mono.just(CommonResult.success(user));
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */

    @GetMapping("/get3")
    public UserVO get3(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return user;
    }

    /**
     * 获得指定用户编号的用户
     *
     * @param id 用户编号
     * @return 用户
     */

    @GetMapping("/get4")
    public CommonResult<UserVO> get4(@RequestParam("id") Integer id) {
        // 查询用户
        UserVO user = new UserVO().setId(id).setUsername("username:" + id);
        // 返回
        return CommonResult.success(user);
    }

}
  • API 接口虽然比较多,但是我们可以先根据返回结果的类型,分成 Flux 和 Mono 两类。然后,艿艿这里又创建了 Mono 分类的四种情况的接口,就是 /users/get/users/get2/users/get3/users/get4 四个。胖友看下这四个接口的返回结果的类型,很容易就明白了。

  • #get(Integer id) 方法,返回的结果是 UserVO 类型。这样,结果会被 GlobalResponseBodyHandler 拦截,包装成 CommonResult 类型返回。请求结果如下:

    {
        "code"0,
        "message""",
        "data": {
            "id"10,
            "username""username:10"
        }
    }
    • 会有 "message": "" 的返回的原因是,我们使用 SpringMVC 提供的 Jackson 序列化,对于 CommonResult 此时的 message = null 的情况下,会序列化它成 "message": "" 返回。实际情况下,不会影响前端处理。
  • # get2(Integer id) 方法,返回的结果是 Mono<Common<UserVO>> 类型。结果虽然也会被 GlobalResponseBodyHandler 处理,但是不会二次再重复包装成 CommonResult 类型返回。

5. 全局异常处理

示例代码对应仓库:lab-27-webflux-02 。

在 「4. 全局统一返回」 中,我们已经定义了使用 CommonResult 全局统一返回,并且看到了成功返回的示例与代码。这一小节,我们主要是来全局异常处理,最终能也是通过 CommonResult 返回。

那么,我们就不哔哔,直接看着示例代码,遨游起来。

友情提示:该示例,基于 「4. 全局统一返回」 的 lab-27-webflux-02 的基础上,继续改造。

5.1 ServiceExceptionEnum

cn.iocoder.springboot.lab27.springwebflux.constants 包路径,创建 ServiceExceptionEnum 枚举类,枚举项目中的错误码。代码如下:

// ServiceExceptionEnum.java

public enum ServiceExceptionEnum {

    // ========== 系统级别 ==========
    SUCCESS(0"成功"),
    SYS_ERROR(2001001000"服务端发生异常"),
    MISSING_REQUEST_PARAM_ERROR(2001001001"参数缺失"),

    // ========== 用户模块 ==========
    USER_NOT_FOUND(1001002000"用户不存在"),

    // ========== 订单模块 ==========

    // ========== 商品模块 ==========
    ;

    /**
     * 错误码
     */

    private int code;
    /**
     * 错误提示
     */

    private String message;

    ServiceExceptionEnum(int code, String message) {
        this.code = code;
        this.message = message;
    }

    // ... 省略 getting 方法

}
  • 因为错误码是全局的,最好按照模块来拆分。如下是艿艿在 onemall 项目的实践:

    /**
     * 服务异常
     *
     * 参考 https://www.kancloud.cn/onebase/ob/484204 文章
     *
     * 一共 10 位,分成四段
     *
     * 第一段,1 位,类型
     *      1 - 业务级别异常
     *      2 - 系统级别异常
     * 第二段,3 位,系统类型
     *      001 - 用户系统
     *      002 - 商品系统
     *      003 - 订单系统
     *      004 - 支付系统
     *      005 - 优惠劵系统
     *      ... - ...
     * 第三段,3 位,模块
     *      不限制规则。
     *      一般建议,每个系统里面,可能有多个模块,可以再去做分段。以用户系统为例子:
     *          001 - OAuth2 模块
     *          002 - User 模块
     *          003 - MobileCode 模块
     * 第四段,3 位,错误码
     *       不限制规则。
     *       一般建议,每个模块自增。
     */

5.2 ServiceException

我们在一起讨论下 Service 逻辑异常的时候,如何进行返回。这里的逻辑异常,我们指的是,例如说用户名已经存在,商品库存不足等。一般来说,常用的方案选择,有两种:

  • 封装统一的业务异常类 ServiceException ,里面有错误码和错误提示,然后进行 throws 抛出。
  • 封装通用的返回类 CommonResult ,里面有错误码和错误提示,然后进行 return 返回。

一开始,我们选择了 CommonResult ,结果发现如下情况:

  • 因为 Spring @Transactional 声明式事务,是基于异常进行回滚的,如果使用 CommonResult 返回,则事务回滚会非常麻烦。
  • 当调用别的方法时,如果别人返回的是 CommonResult 对象,还需要不断的进行判断,写起来挺麻烦的。

所以,后来我们采用了抛出业务异常 ServiceException 的方式。

cn.iocoder.springboot.lab27.springwebflux.core.exception 包路径,创建 ServiceException 异常类,继承 RuntimeException 异常类,用于定义业务异常。代码如下:

// ServiceException.java

public final class ServiceException extends RuntimeException {

    /**
     * 错误码
     */

    private final Integer code;

    public ServiceException(ServiceExceptionEnum serviceExceptionEnum) {
        // 使用父类的 message 字段
        super(serviceExceptionEnum.getMessage());
        // 设置错误码
        this.code = serviceExceptionEnum.getCode();
    }

    // ... 省略 getting 方法

}
  • 提供传入 serviceExceptionEnum 参数的构造方法。具体的处理,看下代码和注释。

5.3 GlobalExceptionHandler

cn.iocoder.springboot.lab27.springwebflux.core.web 包路径,创建 GlobalExceptionHandler 类,全局统一返回的处理器。代码如下:

// GlobalExceptionHandler.java

@ControllerAdvice(basePackages = "cn.iocoder.springboot.lab27.springwebflux.controller")
public class GlobalExceptionHandler {

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 处理 ServiceException 异常
     */

    @ResponseBody
    @ExceptionHandler(value = ServiceException.class)
    public CommonResult serviceExceptionHandler(ServiceException ex) {
        logger.debug("[serviceExceptionHandler]", ex);
        // 包装 CommonResult 结果
        return CommonResult.error(ex.getCode(), ex.getMessage());
    }

    /**
     * 处理 ServerWebInputException 异常
     *
     * WebFlux 参数不正确
     */

    @ResponseBody
    @ExceptionHandler(value = ServerWebInputException.class)
    public CommonResult serverWebInputExceptionHandler(ServerWebInputException ex) {
        logger.debug("[ServerWebInputExceptionHandler]", ex);
        // 包装 CommonResult 结果
        return CommonResult.error(ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR.getCode(),
                ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR.getMessage());
    }

    /**
     * 处理其它 Exception 异常
     */

    @ResponseBody
    @ExceptionHandler(value = Exception.class)
    public CommonResult exceptionHandler(Exception e) {
        // 记录异常日志
        logger.error("[exceptionHandler]", e);
        // 返回 ERROR CommonResult
        return CommonResult.error(ServiceExceptionEnum.SYS_ERROR.getCode(),
                ServiceExceptionEnum.SYS_ERROR.getMessage());
    }

}
  • 在 WebFlux 中,可以使用通过实现 ResponseBodyAdvice 接口,并添加 @ControllerAdvice 接口,拦截 Controller 的返回结果。注意,我们这里 @ControllerAdvice 注解,设置了 basePackages 属性,只拦截 "cn.iocoder.springboot.lab27.springwebflux.controller" 包,也就是我们定义的 Controller 。为什么呢?因为在项目中,我们可能会引入 Swagger 等库,也使用 Controller 提供 API 接口,那么我们显然不应该让 GlobalResponseBodyHandler 去拦截这些接口,毕竟它们并不需要我们去替它们做全局统一的返回
  • 我们定义了三个方法,通过添加 @ExceptionHandler 注解,定义每个方法对应处理的异常。并且,也添加了 @ResponseBody 注解,标记直接使用返回结果作为 API 的响应。
  • #serviceExceptionHandler(...) 方法,拦截处理 ServiceException 业务异常,直接使用该异常的 code + message 属性,构建出 CommonResult 对象返回。
  • #serverWebInputExceptionHandler(...) 方法,拦截处理 ServerWebInputException 请求参数异常,构建出错误码为 ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR 的 CommonResult 对象返回。
  • #exceptionHandler(...) 方法,拦截处理 Exception 异常,构建出错误码为 ServiceExceptionEnum.SYS_ERROR 的 CommonResult 对象返回。这是一个兜底的异常处理,避免有一些其它异常,我们没有在 GlobalExceptionHandler 中,提供自定义的处理方式。

注意,在 #exceptionHandler(...) 方法中,我们还多使用 logger 打印了错误日志,方便我们接入 ELK 等日志服务,发起告警,通知我们去排查解决。如果胖友的系统里暂时没有日志服务,可以记录错误日志到数据库中,也是不错的选择。而其它两个方法,因为是更偏业务的,相对正常的异常,所以无需记录错误日志。

5.4 UserController

在 UserController 类中,我们添加两个 API 接口,抛出异常,方便我们测试全局异常处理的效果。代码如下:

// UserController.java

/**
 * 测试抛出 NullPointerException 异常
 */

@GetMapping("/exception-01")
public UserVO exception01() {
    throw new NullPointerException("没有粗面鱼丸");
}

/**
 * 测试抛出 ServiceException 异常
 */

@GetMapping("/exception-02")
public UserVO exception02() {
    throw new ServiceException(ServiceExceptionEnum.USER_NOT_FOUND);
}
  • #exception01() 方法,抛出 NullPointerException 异常。这样,异常会被 GlobalExceptionHandler#exceptionHandler(...) 方法来拦截,包装成 CommonResult 类型返回。请求结果如下:

    {
        "code"2001001000,
        "message""服务端发生异常",
        "data"null
    }
  • #exception02() 方法,抛出 ServiceException 异常。这样,异常会被 GlobalExceptionHandler#serviceExceptionHandler(...) 方法来拦截,包装成 CommonResult 类型返回。请求结果如下:

    {
        "code"1001002000,
        "message""用户不存在",
        "data"null
    }

5.5 简单小结

采用 ControllerAdvice + @ExceptionHandler 注解的方式,可以很方便的实现 WebFlux 的全局异常处理。不过这种方案存在一个弊端,不支持 WebFlux 的基于函数式编程方式。不过考虑到,绝大多数情况下,我们并不会采用基于函数式编程方式,所以这种方案还是没问题的。看了下 WebFlux 的官方文档,也是推荐这种方案,详细可见 《Web on Reactive Stack —— Spring WebFlux —— Managing Exceptions》 。

如果胖友真的需要支持 WebFlux 的基于函数式编程方式,可以看看 《Handling Errors in Spring WebFlux》 文章,通过继承 org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler 抽象类,实现自定义的全局异常处理器。

6. WebFilter 过滤器

示例代码对应仓库:lab-27-webflux-02 。

在 SpringMVC 中,我们可以通过实现 HandlerInterceptor 接口,拦截 SpringMVC 处理请求的过程,自定义前置和处理的逻辑。不了解这块的胖友,可以看看 《芋道 Spring Boot SpringMVC 入门》 的 「6. HandlerInterceptor 拦截器」 小节。

在 WebFlux 中,我们可以通过实现 WebFilter 接口,过滤 WebFlux 处理请求的过程,自定义前置和处理的逻辑。该接口代码如下:

// DemoWebFilterWebFilter.java

/**
 * Contract for interception-style, chained processing of Web requests that may
 * be used to implement cross-cutting, application-agnostic requirements such
 * as security, timeouts, and others.
 *
 * @author Rossen Stoyanchev
 * @since 5.0
 */

public interface WebFilter {

 /**
  * Process the Web request and (optionally) delegate to the next
  * {@code WebFilter} through the given {@link WebFilterChain}.
  * @param exchange the current server exchange
  * @param chain provides a way to delegate to the next filter
  * @return {@code Mono<Void>} to indicate when request processing is complete
  */

 Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain);

}
  • 因为 WebFilterChain 的 #filter(ServerWebExchange exchange) 方法,返回的是 Mono<Void> 对象,所以可以进行各种 Reactor 的操作。咳咳咳,当然需要胖友比较了解 Reactor 的使用,我们才能实现出的 WebFilter ,否则会觉得挺难用的。
  • 另外,WebFilterChain 是由多个 WebFilter 过滤器组成的链,其默认的实现为 DefaultWebFilterChain 。
  • 总体来说,从形态上和我们在 Servlet 看到的 FilterChain 和 Filter 是比较相似的,只是因为结合了 Reactor 响应式编程,所以编写时,差异蛮大的。

6.1 DemoWebFilter

下面,让我们来编写一个简单的 WebFilter 示例。

cn.iocoder.springboot.lab27.springwebflux.core.filter 包路径,创建 DemoWebFilter 类,一个简单的 WebFilter 示例。代码如下:

// DemoWebFilter.java

@Component
@Order(1)
public class DemoWebFilter implements WebFilter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        // <1> 继续执行请求
        return webFilterChain.filter(serverWebExchange)
                .doOnSuccess(new Consumer<Void>() { // <2> 执行成功后回调

                    @Override
                    public void accept(Void aVoid) {
                        logger.info("[accept][执行成功]");
                    }

                });
    }

}
  • 在类上,添加 @Component 注解,创建 DemoWebFilter Bean 对象。这样,该过滤器就已经加入了 WebFlux 的过滤器链中。目前,暂未内置支持根据请求路径 uri 等条件来配置是否过滤,需要我们自己在实现 #filter(serverWebExchange, webFilterChain) 方法来完成。
  • 在类上,添加 @Order 注解,设置过滤器的顺序。
  • 实现 #filter(serverWebExchange, webFilterChain) 方法,实现在请求执行完成后,打印一条执行成功的日志。
    • <1> 处,调用 WebFilterChain#filter(exchange) 方法,交给过滤器中的下一个过滤器,继续进行过滤处理,并返回 Mono<Void> 对象。
    • <2> 处,调用 Mono#doOnSuccess(Consumer<? super T> onSuccess) 方法,实现在请求执行完成后,打印一条执行成功的日志。这里,我们可以参考 《Reactor 文档 —— Mono》 ,实现各种其它操作。

😈 在后面的小节中,我们会看一个实现处理 Cors 跨域的 CorsWebFilter ,对理解 WebFilter 有一定的帮助。

6.2 Filtering Handler Functions

在基于函数式编程方式中,可以使用如下的方式,实现对每个路由的过滤处理。代码如下:

// UserRouter.java

@Bean
public RouterFunction<ServerResponse> demo2RouterFunction() {
    return route(GET("/users2/demo2"), request -> ok().bodyValue("demo"))
            .filter(new HandlerFilterFunction<ServerResponse, ServerResponse>() {

                @Override
                public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
                    return next.handle(request).doOnSuccess(new Consumer<ServerResponse>() { // 执行成功后回调

                        @Override
                        public void accept(ServerResponse serverResponse) {
                            logger.info("[accept][执行成功]");
                        }

                    });
                }

            });
}

因为实际场景下,使用到基于函数式编程方式比较少,这里就不扩展开来讲。感兴趣的胖友,可以看看 《Web on Reactive Stack —— Spring WebFlux —— Filtering Handler Functions》 文档。

7. Servlet、Filter、Listener

目前测试下来,java.servlet 提供的 Servlet、Filter、Listener 组件,无法在 WebFlux 中使用。测试的示例,可见 lab-27-webflux-03 。

艿艿翻了下 Spring Security 对 WebFlux 的支持,也是通过实现 WebFlux 接口的 WebFilterChainProxy 过滤器,即在 「6. WebFilter 过滤器」 中看到的内容。

8. Cors 跨域

超过微信文章长度限制,请访问 http://www.iocoder.cn/Spring-Boot/WebFlux/

9. 集成响应式的 MongoDB

超过微信文章长度限制,请访问 http://www.iocoder.cn/Spring-Boot/WebFlux/

10. 集成响应式的 Redis

超过微信文章长度限制,请访问 http://www.iocoder.cn/Spring-Boot/WebFlux/

11. 集成响应式的 Elasticsearch

超过微信文章长度限制,请访问 http://www.iocoder.cn/Spring-Boot/WebFlux/

12. 整合响应式的 JPA

超过微信文章长度限制,请访问 http://www.iocoder.cn/Spring-Boot/WebFlux/

13. 整合响应式的 R2DBC 和事务

超过微信文章长度限制,请访问 http://www.iocoder.cn/Spring-Boot/WebFlux/

14. 其他内容

超过微信文章长度限制,请访问 http://www.iocoder.cn/Spring-Boot/WebFlux/

666. 彩蛋

至此,我们已经完成了 Spring WebFlux 的简单入门。如果用一句简单的话来概括 WebFlux 的话,那就是:

  • WebFlux 在 Spring Framework 5 推出的,以 Reactor 库为基础,基于异步和事件驱动,实现的响应式 Web 开发框架。
  • WebFlux 能够充分利用多核 CPU 的硬件资源,处理大量的并发请求。因此,可以在不扩充硬件的资源的情况下,提升系统的吞吐性和伸缩性。

注意,这里我们提到的是吞吐性和伸缩性,而不是提升每个请求的性能。我们来回想下整个 WebFlux 的执行过程:请求是被作为一个事件丢到线程池中执行,等到执行完毕,异步回调结果给主线程,最后返回给前端。

那么整个过程,相比 SpringMVC 的执行过程来说,至少多了一次线程的上下文切换。我们都知道,线程的切换是有成本的。所以,单看一个请求的处理,SpringMVC 的性能是优于 WebFlux 的。

我们上文提到的主线程,一般来说就是 IO 线程。

但是,由于 WebFlux 的 IO 线程是非阻塞的,可以不断解析请求,丢到线程池中执行。而 SpringMVC 的 IO 线程是阻塞的,需要等到请求被处理完毕,才能解析下一个请求并进行处理。这样,随着每个请求的被处理时间越长、并发请求的量级越大,WebFlux 相比 SpringMVC 的整体吞吐量高的越多,平均的请求响应时间越短。如下图所示:

性能对比

从图中,我们可以看到,随着并发请求量的增大,WebFlux 的响应时间平稳在 100ms 左右,而 SpringMVC 的响应式时间从 3000 并发量开始,响应时间直线上升。😈 感兴趣的胖友,可以参考如下文章,自己做一波性能的基准测试:

  • 《性能测试 —— SpringMVC、Webflux 基准测试》
  • 《性能测试 —— Spring Cloud Gateway、Zuul 基准测试》
  • 《WebFlux 性能测试》
  • 《WebFlux 性能问题和适用场景》

那么什么场景下的服务,适合使用 WebFlux 呢?我们可以把任务分成 IO 密集型和 CPU 密集型,而服务本质上,是执行一个又一个的任务,所以也可以这么分。😈 不了解 IO 密集型和 CPU 密集型的胖友,可以先看下 《计算密集型和 IO 密集型》 文章。

而我们业务中编写的代码,都无一幸免需要跟 MySQL、MongoDB、Elasticsearch 等数据库打交道,又或者跟 Redis、Memcached 等缓存服务打交道,还或者需要跟 RocketMQ、RabbitMQ、Kafka 等消息队列打交道。无论这些中间件做的多牛逼,性能多么掉渣天,我们都无法避免会经过网络 IO 和磁盘 IO 。所以,我们提供的服务,大多数都是 IO 密集型。很少会存在,直接从内存读取数据,直接返回的情况。

**因此,我们业务中编写的代码,绝大多多多数都是 IO 密集型,都是适合使用 WebFlux 的。**但是,响应式编程对开发人员的编码能力要求会比较高,一旦脑子一抽,在 IO 线程中编写了阻塞代码,反倒出现性能下滑。具体可以看看艿艿在 《性能测试 —— SpringMVC、Webflux 基准测试》 提供的测试示例,明明白白的。

艿艿建议的话,如果考虑使用 WebFlux 的话,一定要把 Reactor 好好学习下,不然真的是做厮大发好。同时,每次上线之前,对使用 WebFlux 编写的服务,做下性能测试,可以发现编写不正确的地方,找到阻塞 IO 线程的逻辑。

目前,暂时找不到大规模使用 WebFlux 的业务开源项目,最大使用 WebFlux 构建的开源项目,就是 Spring Cloud 开源的网关 Spring Cloud Gateway 。😈 可能,WebFlux 或者响应式编程最好的归宿,暂时是中间件。如果胖友有看过 Dubbo 的线程模型,就会发现和 WebFlux 是异曲同工之妙。

OK ,哔哔结束~如果胖友想要进一步了解 WebFlux 的话,不烦看看 Spring Cloud Gateway 的源码,可以看看艿艿写的 《芋道 Spring Cloud Gateway 源码解析》 。




欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 20 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。


评论