一、概述
1.1简介
简单来说,Webflux 是响应式编程的框架,与其对等的概念是 SpringMVC。两者的不同之处在于 Webflux 框架是异步非阻塞的,其可以通过较少的线程处理高并发请求。
WebFlux:底层完全基于netty+reactor+springweb 完成一个全异步非阻塞的web响应式框架
底层:异步 + 消息队列(内存) + 事件回调机制 = 整套系统
优点:能使用少量资源处理大量请求;
以前: 浏览器 --> Controller --> Service --> Dao: 阻塞式编程
现在: Dao(数据源查询对象【数据发布者】) --> Service --> Controller --> 浏览器: 响应式
1.2什么是异步 Servlet
在 Servlet3.0 之前,Servlet 采用 Thread-Per-Request 的方式处理 Http 请求,即每一次请求都是由某一个线程从头到尾负责处理。
如果一个请求需要进行 IO 操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待 IO 操作完成, 而 IO 操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,如果并发量很大的话,那肯定会造性能问题。
有了异步 Servlet 之后,后台 Servlet 的线程会被及时释放,释放之后又可以去接收新的请求,进而提高应用的并发能力。
1.3SSE
SSE 全称是 Server-Sent Events,它的作用和 WebSocket 的作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息,不同的是,WebSocket 是一种全双工通信协议,而 SSE 则是一种单工通信协议,即使用 SSE 只能服务器向浏览器推送信息流,浏览器如果向服务器发送信息,就是一个普通的 HTTP 请求。
使用 SSE,当服务端给客户端响应的时候,他不是发送一个一次性数据包,而是会发送一个数据流,这个时候客户端的连接不会关闭,会一直等待服务端发送过来的数据流,我们常见的视频播放其实就是这样的例子。
SSE 和 WebSocket 主要有如下区别:
-
SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
-
SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
-
SSE 默认支持断线重连,WebSocket 需要自己实现。
-
SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
-
SSE 支持自定义发送的消息类型。
二、快速入门
2.0创建工程
为了演示方便,松哥这里就直接采用 Spring Boot 工程了,首先我们创建一个 Spring Boot 工程,需要注意的是,以往创建 Spring Boot 时我们都是选择 Spring Web 依赖,但是这次我们选择 Spring Reactive Web 依赖,如下图:
添加上这一个依赖就 OK 了。
这个时候创建好的 Spring Boot 项目,底层容器是 Netty 而不是我们之前广泛使用的 Tomcat 了。
2.1添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2.2HttpHandler、HttpServer
public class FluxMainApplication {
public static void main(String[] args) throws IOException {
//快速自己编写一个能处理请求的服务器
//1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号
HttpHandler handler = (ServerHttpRequest request,
ServerHttpResponse response)->{
URI uri = request.getURI();
System.out.println(Thread.currentThread()+"请求进来:"+uri);
//编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"
// response.getHeaders(); //获取响应头
// response.getCookies(); //获取Cookie
// response.getStatusCode(); //获取响应状态码;
// response.bufferFactory(); //buffer工厂
// response.writeWith() //把xxx写出去
// response.setComplete(); //响应结束
//数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>
//创建 响应数据的 DataBuffer
DataBufferFactory factory = response.bufferFactory();
//数据Buffer
DataBuffer buffer = factory.wrap(new String(uri.toString() + " ==> Hello!").getBytes());
// 需要一个 DataBuffer 的发布者
return response.writeWith(Mono.just(buffer));
};
//2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
//3、启动Netty服务器
HttpServer.create()
.host("localhost")
.port(8080)
.handle(adapter) //用指定的处理器处理请求
.bindNow(); //现在就绑定
System.out.println("服务器启动完成....监听8080,接受请求");
System.in.read();
System.out.println("服务器停止....");
}
}
2.3DispatcherHandler
SpringMVC: DispatcherServlet;
SpringWebFlux: DispatcherHandler
package com.yanyu.webflux.controller;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.reactive.result.view.Rendering;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
/**
* @author lfy
* @Description
* @create 2023-12-01 20:52
*/
@ResponseBody
@Controller
public class HelloController {
//WebFlux: 向下兼容原来SpringMVC的大多数注解和API;
@GetMapping("/hello")
public String hello(@RequestParam(value = "key",required = false,defaultValue = "哈哈") String key,
ServerWebExchange exchange,
WebSession webSession,
HttpMethod method,
HttpEntity<String> entity,
@RequestBody String s,
FilePart file){
// file.transferTo() //零拷贝技术;
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
String name = method.name();
Object aaa = webSession.getAttribute("aaa");
webSession.getAttributes().put("aa","nn");
return "Hello World!!! key="+key;
}
// Rendering:一种视图对象。
@GetMapping("/bai")
public Rendering render(){
// Rendering.redirectTo("/aaa"); //重定向到当前项目根路径下的 aaa
return Rendering.redirectTo("http://www.baidu.com").build();
}
//现在推荐的方式
//1、返回单个数据Mono: Mono<Order>、User、String、Map
//2、返回多个数据Flux: Flux<Order>
//3、配合Flux,完成SSE: Server Send Event; 服务端事件推送
@GetMapping("/haha")
public Mono<String> haha(){
// ResponseEntity.status(305)
// .header("aaa","bbb")
// .contentType(MediaType.APPLICATION_CBOR)
// .body("aaaa")
// .
return Mono.just(0)
.map(i-> 10/i)
.map(i->"哈哈-"+i);
}
@GetMapping("/hehe")
public Flux<String> hehe(){
return Flux.just("hehe1","hehe2");
}
//text/event-stream
//SSE测试; chatgpt都在用; 服务端推送
@GetMapping(value = "/sse",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sse(){
return Flux.range(1,10)
.map(i-> {
//构建一个SSE对象
return ServerSentEvent.builder("ha-" + i)
.id(i + "")
.comment("hei-" + i)
.event("haha")
.build();
})
.delayElements(Duration.ofMillis(500));
}
//SpringMVC 以前怎么用,基本可以无缝切换。
// 底层:需要自己开始编写响应式代码
}
2.4错误处理
@GetMapping("/haha")
public Mono<String> haha(){
// ResponseEntity.status(305)
// .header("aaa","bbb")
// .contentType(MediaType.APPLICATION_CBOR)
// .body("aaaa")
// .
return Mono.just(0)
.map(i-> 10/i)
.map(i->"哈哈-"+i);
}
@ExceptionHandler(ArithmeticException.class)
public String error(ArithmeticException exception){
System.out.println("发生了数学运算异常"+exception);
//返回这些进行错误处理;
// ProblemDetail: 建造者:声明式编程、链式调用
// ErrorResponse :
return "炸了,哈哈...";
}
2.6常用注解
1、目标方法传参
Method Arguments :: Spring Framework
Controller method argument | Description |
ServerWebExchange | 封装了请求和响应对象的对象; 自定义获取数据、自定义响应 |
ServerHttpRequest, ServerHttpResponse | 请求、响应 |
WebSession | 访问Session对象 |
java.security.Principal | |
org.springframework.http.HttpMethod | 请求方式 |
java.util.Locale | 国际化 |
java.util.TimeZone + java.time.ZoneId | 时区 |
@PathVariable | 路径变量 |
@MatrixVariable | 矩阵变量 |
@RequestParam | 请求参数 |
@RequestHeader | 请求头; |
@CookieValue | 获取Cookie |
@RequestBody | 获取请求体,Post、文件上传 |
HttpEntity<B> | 封装后的请求对象 |
@RequestPart | 获取文件上传的数据 multipart/form-data. |
java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap. | Map、Model、ModelMap |
@ModelAttribute | |
Errors, BindingResult | 数据校验,封装错误 |
SessionStatus + class-level @SessionAttributes | |
UriComponentsBuilder | For preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links. |
@SessionAttribute | |
@RequestAttribute | 转发请求的请求域数据 |
Any other argument | 所有对象都能作为参数: 1、基本类型 ,等于标注@RequestParam 2、对象类型,等于标注 @ModelAttribute |
2、返回值写法
sse和websocket区别:
- SSE:单工;请求过去以后,等待服务端源源不断的数据
- websocket:双工: 连接建立后,可以任何交互;
Controller method return value | Description |
@ResponseBody | 把响应数据写出去,如果是对象,可以自动转为json |
HttpEntity<B>, ResponseEntity<B> | ResponseEntity:支持快捷自定义响应内容 |
HttpHeaders | 没有响应内容,只有响应头 |
ErrorResponse | 快速构建错误响应 |
ProblemDetail | SpringBoot3; |
String | 就是和以前的使用规则一样; forward: 转发到一个地址 redirect: 重定向到一个地址 配合模板引擎 |
View | 直接返回视图对象 |
java.util.Map, org.springframework.ui.Model | 以前一样 |
@ModelAttribute | 以前一样 |
Rendering | 新版的页面跳转API; 不能标注 @ResponseBody 注解 |
void | 仅代表响应完成信号 |
Flux<ServerSentEvent>, Observable<ServerSentEvent>, or other reactive type | 使用 text/event-stream 完成SSE效果 |
Other return values | 未在上述列表的其他返回值,都会当成给页面的数据; |
2.7文件上传
@PostMapping("/")
public String handle(@RequestPart("meta-data") Part metadata,
@RequestPart("file-data") FilePart file) {
// ...
}
2.8自定义Flux配置
WebFluxConfigurer
容器中注入这个类型的组件,重写底层逻辑
@Configuration
public class MyWebConfiguration {
//配置底层
@Bean
public WebFluxConfigurer webFluxConfigurer(){
return new WebFluxConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedHeaders("*")
.allowedMethods("*")
.allowedOrigins("localhost");
}
};
}
}
2.9 Filter
@Component
public class MyWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
System.out.println("请求处理放行到目标方法之前...");
Mono<Void> filter = chain.filter(exchange); //放行
//流一旦经过某个操作就会变成新流
Mono<Void> voidMono = filter.doOnError(err -> {
System.out.println("目标方法异常以后...");
}) // 目标方法发生异常后做事
.doFinally(signalType -> {
System.out.println("目标方法执行以后...");
});// 目标方法执行之后
//上面执行不花时间。
return voidMono; //看清楚返回的是谁!!!
}
}
三、WebFlux CURD 实战(mongodb)
3.1概述
WebFlux 最为人所诟病的是数据库的支持问题,毕竟数据是一个应用的生命,我们接触的大部分应用程序都是有数据库的,而 WebFlux 在这一方面的支持行一直比较弱,这也是大家总是吐槽它的原因。
不过从 Spring5 开始,这一问题得到了一定程度的缓解。
Spring 官方在 Spring5 发布了响应式 Web 框架 Spring WebFlux 之后急需能够满足异步响应的数据库交互 API,不过由于缺乏标准和驱动,Pivotal 团队开始自己研究响应式关系型数据库连接 Reactive Relational Database Connectivity,并提出了 R2DBC 规范 API 用来评估可行性并讨论数据库厂商是否有兴趣支持响应式的异步非阻塞驱动程序。最早只有 PostgreSQL 、H2、MSSQL 三家数据库厂商,不过现在 MySQL 也加入进来了,这是一个极大的利好。目前 R2DBC 的最新版本是 0.9.0.RELEASE。
3.2相关概念
基于spring-data-mongodb-reactive响应式框架实现与mongodb的互交。 ReactiveMongoRepository<T, ID>支持响应式编程并提供基础功能的接口,类似JpaRepository/BaseMapper接口。 基于Reactive编程时,查询返回多条记录封装在`Flux<T>`而非`Flux<List<T>>`。 再通过collectList()方法将元素聚合为单一`Mono<List<T>>`类型 聚合语句声明在Repository接口查询方法的@Aggregation注解中。java11不支持文本块,可读性太差了。 mongodb日期时间以UTC计算,并自动转换本地时间存储。但在获取时,spring-data-mongodb会转换回本地时间。 因此,涉及日期时间的必须通过业务逻辑操作。
3.3环境配置
maven依赖
yml依赖
spring:
application:
name: mongodb-examples
data:
mongodb:
host: 47.96.254.46
port: 27017
database: test
username: mongo
password: '1213' # 密码按char[]处理。纯数字要加单引号,最好都加单引号
authentication-database: admin
# uri: mongodb://mongo:1213@192.168.1.8/test?authSource=admin # 等效连接
auto-index-creation: true # 仅声明索引注解无效,必须显式声明。
logging:
level:
root: warn
com:
example: debug
pattern:
console: '%-5level %C.%M[%line] - %msg%n'
3.4CRUD
实体类
@Document
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class user {
@Id
private String id;
private String username;
private String address;
}
Repository
@EnableMongoRepositories
public interface UserDao extends ReactiveMongoRepository<User,String> {
}
Controller
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
UserDao userDao;
@PostMapping("/add")
public Mono<user> addUser(@RequestBody user user) {
return userDao.save(user);
}
@GetMapping("/getall")
public Flux<user> getAll() {
return userDao.findAll();
}
@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<user> streamGetAll() {
return userDao.findAll();
}
@DeleteMapping("/delete/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable String id) {
return userDao.findById(id)
.flatMap(user -> userDao.delete(user).then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
.defaultIfEmpty(new ResponseEntity(HttpStatus.NOT_FOUND));
}
@PutMapping("/update")
public Mono<ResponseEntity<user>> updateUser(@RequestBody user user) {
return userDao.findById(user.getId())
.flatMap(u -> userDao.save(user))
.map(u->new ResponseEntity<user>(u,HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity(HttpStatus.NOT_FOUND));
}
}