响应式编程基于 Project Reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的 IO 操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。本文以 Reactive 方式访问 Redis 为例介绍 Project Reactor 响应式流框架~
目录
1 Project Reactor 介绍
2 以 Reactive 方式访问Redis
2.1 环境搭建
①初步环境搭建见于 【Spring 04】 链接的 2.1 小节
②然后新增 RedisConfig 继承 RedisReactiveAutoConfiguration 类
2.2 基于 ReactiveStringRedisTemplate 实现 CRUD
① set + Mono
② append + Mono
③ delete + Mono
④ get + Mono
⑤ opsForList + Flux
⑥ opsForHash+ Flux
⑦ get + Flux + buffer()
⑧ get + Flux + cache()
⑨ map()、filter()、take()等操作符
2.3 关于 ReactiveCrudRepository
1 Project Reactor 介绍
Reactor 是一个运行在 Java8 之上满足 Reactice 规范的响应式框架,它提供了一组响应式风格的 API,主要目的:希望用少量、有限个数的线程来满足高负载的需要。IO阻塞浪费系统性能,只有纯异步处理才能发挥系统的全部性能。
介绍一下最重要的两个类,可能单纯的介绍会让大家觉得云里雾里,但是看到后边 Redis 的实践内容就会恍然大悟了。
Reactor 有两个核心类: Flux<T>
和 Mono<T>
,这两个类都实现 Publisher 接口。
- Flux 可以触发零到多个事件,并根据实际情况结束处理或触发错误。
- Mono 最多只触发一个事件,所以可以把 Mono 用于在异步任务完成时发出通知。
简单来说,Mono<T>
表示 0~1 的序列, Flux<T>
用来表示 0~N 个元素序列,
本文 Reactive 方式访问 Redis 为例,因为
2 以 Reactive 方式访问Redis
2.1 环境搭建
①初步环境搭建见于 【Spring 04】 链接的 2.1 小节
②然后新增 RedisConfig 继承 RedisReactiveAutoConfiguration 类
路径:src/main/java/com/yinyu/redisdemo/config/RedisConfig.java
@Configuration
public class RedisConfig extends RedisReactiveAutoConfiguration {
}
2.2 基于 ReactiveStringRedisTemplate 实现 CRUD
本文采用 ReactiveStringRedisTemplate,展示性更强,当然ReactiveRedisTemplate 也可使用,这两者的区别类似 RedisTemplate 和 StringRedisTemplate,见于 【Spring 04】 链接的 2.3.1 。
opsForValue、opsForList、opsForSet、opsForZSet、opsForHash 等具体方法的操作也与 StringRedisTemplate 一致,见于 【Spring 04】 链接的 2.3.1 小节,主要是返回内容有所区别(见下文),本文选择重要的内容,并简单介绍几个操作符~
路径:src/test/java/com/yinyu/redisdemo/reactiveStringRedisTemplateTest.java
首先注入 ReactiveStringRedisTemplate 类:
@Autowired
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
① set + Mono<Boolean>
Ⅰmono.block() -- 返回 mono 内的元素
此处用到了 Reactor 的 Mono,同时 Mono 包含 Boolean,用于返回插入操作是否成功~
因为 Boolean 被封装在 Mono 内,所以无法直接得知 True 还是 False,那么 mono.block() 就起到了这个作用,简单来说是接触了 Mono 的这层封装,从而返回 Boolean。
@Test
public void reactiveOpsForValueSetTest1() {
Mono<Boolean> mono = reactiveStringRedisTemplate.opsForValue().set("human","yinyu");
System.out.println(mono.block());
}
插入成功 👇
控制台输出成功 👇
Ⅱ mono.subscribe() -- 订阅
所有的操作只有在订阅的那一刻才开始进行!!!详情链接:【Reactor学】
本文只用到 subscribe(System.out::println) ,里边就是函数式编程写法~
@Test
public void reactiveOpsForValueSetTest1() {
Mono<Boolean> mono = reactiveStringRedisTemplate.opsForValue().set("Chinese","yinyu");
mono.subscribe(System.out::println);
}
插入成功 👇
控制台输出成功 👇
Ⅲ mono.subscribe().dispose()
表示彻底停止正在推送数据中的Flux或Mono流
@Test
public void reactiveOpsForValueSetTest1() {
Mono<Boolean> mono = reactiveStringRedisTemplate.opsForValue().set("Chinese","yinyu");
mono.subscribe(System.out::println).dispose();
}
② append + Mono<Long>
接下来都用 subscribe(System.out::println) 控制台输出,返回的是 append 后字符串的长度
@Test
public void reactiveOpsForValueAppendTest() {
Mono<Long> mono = reactiveStringRedisTemplate.opsForValue().append("human","+java");
mono.subscribe(System.out::println);
}
新增字符到末尾成功👇
控制台输出 append 后字符串的长度👇
③ delete + Mono<Boolean>
根据 key 删除记录,返回 Mono 包装的 Boolean
@Test
public void reactiveOpsForValueDeleteTest() {
Mono<Boolean> mono = reactiveStringRedisTemplate.opsForValue().delete("human");
mono.subscribe(System.out::println);
}
④ get + Mono<String>
根据 key 查询记录,返回 Mono 包装的 String
@Test
public void reactiveOpsForValueGetTest() {
Mono<String> mono = reactiveStringRedisTemplate.opsForValue().get("human");
mono.subscribe(System.out::println);
}
查询成功:
⑤ opsForList + Flux<String>
Ⅰ reactiveopsForList 新增操作
新增 OpsForList 记录也能输出 Mono<Long>,还记得 subscribe() 的作用吗,我们在这做个示范,若第一条列表新增方法没调用 subscribe() 会如何👇
@Test
public void reactiveOpsForListTest1() {
ReactiveListOperations<String, String> listOperations = reactiveStringRedisTemplate.opsForList();
//1、没有使用 subscribe()
listOperations.leftPush("reactiveList", "hello1");
//2、直接调用 subscribe()
listOperations.leftPush("reactiveList", "world2").subscribe();
//3、对输出的 mono 使用 subscribe()
Mono<Long> mono = listOperations.leftPush("reactiveList", "yinyu3");
mono.subscribe(System.out::println);
}
数据库界面 👇,可以看到未调用 subscribe() 的步骤未执行,这就是“所有的操作只有在订阅的那一刻才开始进行!!”的含义,同时用 block() 代替也可实现。
Ⅱ 查询操作
Flux<String> 中 Flus 的作用相当于 List ,接触封装后打印出类似 List<String> 的形式
@Test
public void reactiveOpsForListTest2() {
Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
flux.subscribe(System.out::println);
}
查询成功 👇
⑥ opsForHash+ Flux<Object>
给每个新增的步骤加上 subscribe() ,需要注意的的是查询返回的 Mono 包装的是 Object
@Test
public void reactiveOpsForHashTest(){
//1、reactiveOpsForHash 新增操作
ReactiveHashOperations<String, String, String> hashOperations = reactiveStringRedisTemplate.opsForHash();
hashOperations.put("Reactivekey", "hashkey1", "hello").subscribe();
hashOperations.put("Reactivekey", "hashkey2", "world").subscribe();
hashOperations.put("Reactivekey", "hashkey3", "java").subscribe();
//2、reactiveOpsForHash 查询操作
Mono<Object> mono2 = reactiveStringRedisTemplate.opsForHash().get("Reactivekey","hashkey2");
mono2.subscribe(System.out::println);
}
新增成功 👇
查询成功 👇
⑦ get + Flux<String> + buffer()
Ⅰ flux 经过 buffer 方法,转换成 list 传递给订阅者,buffer(1)表述1个元素成1个list,简单来说是对每个元素进行了列表封装。
@Test
public void bufferTest(){
//opsForList() 查询
Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
flux.buffer(1).subscribe(System.out::println);
}
查询成功 👇
Ⅱ 每经过一段时间,传递给订阅者一次数据,用到 Duration.ofSeconds(1)(1秒钟的延迟)
@Test
public void bufferTest(){
//opsForList() 查询
Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
flux.buffer(Duration.ofSeconds(1)).subscribe(System.out::println);
}
⑧ get + Flux<String> + cache()
它缓存 Flux/Mono 前面步骤的结果,直到调用 cache() 方法为止
@Test
@SneakyThrows
public void cacheTest1(){
Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
var cached = flux.cache(Duration.ofSeconds(2));
cached.subscribe(System.out::println);
}
⑨ map()、filter()、take()等操作符
本文举几个相对常用的操作符,详情的话还请看我之前写的这篇文章:【Java8:Stream流详解】
Ⅰmap()
接受一个函数作为参数,这个函数会被应用到每个元素上,并将其映射成一个新的元素。
@Test
public void mapTest(){
//查询
Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
//map--对 Flux 里的每个元素加上 " Good",然后输出
flux.map(e -> e + " Good").subscribe(System.out::println);
//map--对 buffer(1) 后的每个列表元素里的 String 加上 " Great",然后输出每个列表元素里的 String
flux.buffer(1).map(List->List.stream().map(e-> e + " Great")).subscribe(e->e.forEach(System.out::println));
}
操作成功 👇
Ⅱ filter()
过滤出符合条件的记录~
@Test
public void filterTest(){
//查询
Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
flux.filter(e->e.equals("yinyu")).subscribe(System.out::println);
}
过滤成功 👇
Ⅲ take()
指定发送事件个数,以下为指定第一个事件 👇
@Test
public void takeTest(){
//查询
Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
flux.take(1).subscribe(System.out::println);
}
指定成功 👇
2.3 关于 ReactiveCrudRepository
其实,Spring Data 也为 Reactive 形式访问数据库提供了支持,类似 CrudRepository ,只是多了 Reactor 响应式框架的内容,至于对数据流(Mono、Flux等)的操作参考前文即可,但可惜的是该形式不被 Redis 支持 👇,不过 MongoDB是可以用 ReactiveCrudRepository 的。
参考文章
Flux、Mono、Reactor 实战(史上最全)_架构师-尼恩的博客-CSDN博客
Reactive的方式访问Redis - 腾讯云开发者社区-腾讯云 (tencent.com)
flux 中的 buffer 的原理__lrs的博客-CSDN博客