【Java开发】 Spring 05 :Project Reactor 响应式流框架(以Reactive方式访问Redis为例)

news2024/11/17 17:51:35

响应式编程基于 Project Reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的 IO 操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。本文以 Reactive 方式访问 Redis 为例介绍 Project Reactor 响应式流框架~

目录

1 Project Reactor 介绍

2 以 Reactive 方式访问Redis

2.1 环境搭建

①初步环境搭建见于 【Spring 04】 链接的 2.1 小节

②然后新增 RedisConfig 继承 RedisReactiveAutoConfiguration 类

2.2 基于 ReactiveStringRedisTemplate 实现 CRUD

① set + Mono 

② append + Mono 

③ delete + Mono 

④ get + Mono 

⑤ opsForList + Flux

⑥ opsForHash+ Flux

⑦ get + Flux + buffer()

⑧ get + Flux + cache()

⑨  map()、filter()、take()等操作符

2.3 关于 ReactiveCrudRepository


1 Project Reactor 介绍

Reactor 是一个运行在 Java8 之上满足 Reactice 规范的响应式框架,它提供了一组响应式风格的 API,主要目的:希望用少量、有限个数的线程来满足高负载的需要。IO阻塞浪费系统性能,只有纯异步处理才能发挥系统的全部性能。

介绍一下最重要的两个类,可能单纯的介绍会让大家觉得云里雾里,但是看到后边 Redis 的实践内容就会恍然大悟了。

Reactor 有两个核心类: Flux<T> 和 Mono<T>,这两个类都实现 Publisher 接口。

  • Flux 可以触发零到多个事件,并根据实际情况结束处理或触发错误。
  • Mono 最多只触发一个事件,所以可以把 Mono 用于在异步任务完成时发出通知。

简单来说,Mono<T> 表示 0~1 的序列, Flux<T>用来表示 0~N 个元素序列,

本文 Reactive 方式访问 Redis 为例,因为

2 以 Reactive 方式访问Redis

2.1 环境搭建

①初步环境搭建见于 【Spring 04】 链接的 2.1 小节

②然后新增 RedisConfig 继承 RedisReactiveAutoConfiguration 类

路径:src/main/java/com/yinyu/redisdemo/config/RedisConfig.java

@Configuration
public class RedisConfig extends RedisReactiveAutoConfiguration {

}

2.2 基于 ReactiveStringRedisTemplate 实现 CRUD

本文采用 ReactiveStringRedisTemplate,展示性更强,当然ReactiveRedisTemplate 也可使用,这两者的区别类似 RedisTemplate 和 StringRedisTemplate,见于 【Spring 04】 链接的 2.3.1 。

opsForValue、opsForList、opsForSet、opsForZSet、opsForHash 等具体方法的操作也与 StringRedisTemplate 一致,见于 【Spring 04】 链接的 2.3.1 小节,主要是返回内容有所区别(见下文),本文选择重要的内容,并简单介绍几个操作符~

路径:src/test/java/com/yinyu/redisdemo/reactiveStringRedisTemplateTest.java

首先注入 ReactiveStringRedisTemplate 类:

@Autowired
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;

① set + Mono<Boolean> 

Ⅰmono.block() -- 返回 mono 内的元素

此处用到了 Reactor 的 Mono,同时 Mono 包含 Boolean,用于返回插入操作是否成功~

因为 Boolean 被封装在 Mono 内,所以无法直接得知 True 还是 False,那么 mono.block() 就起到了这个作用,简单来说是接触了 Mono 的这层封装,从而返回 Boolean。

@Test
public void reactiveOpsForValueSetTest1() {
    Mono<Boolean> mono = reactiveStringRedisTemplate.opsForValue().set("human","yinyu");
    System.out.println(mono.block());
}

插入成功 👇

 控制台输出成功 👇

Ⅱ mono.subscribe() -- 订阅

所有的操作只有在订阅的那一刻才开始进行!!!详情链接:【Reactor学】

本文只用到 subscribe(System.out::println) ,里边就是函数式编程写法~

@Test
public void reactiveOpsForValueSetTest1() {
    Mono<Boolean> mono = reactiveStringRedisTemplate.opsForValue().set("Chinese","yinyu");
    mono.subscribe(System.out::println);
}

插入成功 👇

控制台输出成功 👇

Ⅲ mono.subscribe().dispose()

表示彻底停止正在推送数据中的Flux或Mono流

@Test
public void reactiveOpsForValueSetTest1() {
    Mono<Boolean> mono = reactiveStringRedisTemplate.opsForValue().set("Chinese","yinyu");
    mono.subscribe(System.out::println).dispose();
}

② append + Mono<Long> 

接下来都用 subscribe(System.out::println) 控制台输出,返回的是 append 后字符串的长度

@Test
public void reactiveOpsForValueAppendTest() {
    Mono<Long> mono = reactiveStringRedisTemplate.opsForValue().append("human","+java");
    mono.subscribe(System.out::println);
}

新增字符到末尾成功👇

控制台输出 append 后字符串的长度👇

③ delete + Mono<Boolean> 

根据 key 删除记录,返回 Mono 包装的 Boolean

@Test
public void reactiveOpsForValueDeleteTest() {
    Mono<Boolean> mono = reactiveStringRedisTemplate.opsForValue().delete("human");
    mono.subscribe(System.out::println);
}

④ get + Mono<String> 

根据 key 查询记录,返回 Mono 包装的 String

@Test
public void reactiveOpsForValueGetTest() {
    Mono<String> mono = reactiveStringRedisTemplate.opsForValue().get("human");
    mono.subscribe(System.out::println);
}

查询成功:

⑤ opsForList + Flux<String>

Ⅰ reactiveopsForList 新增操作

新增 OpsForList 记录也能输出 Mono<Long>,还记得 subscribe() 的作用吗,我们在这做个示范,若第一条列表新增方法没调用 subscribe() 会如何👇

@Test
public void reactiveOpsForListTest1() {
    ReactiveListOperations<String, String> listOperations = reactiveStringRedisTemplate.opsForList();
    //1、没有使用 subscribe()
    listOperations.leftPush("reactiveList", "hello1");
    //2、直接调用 subscribe()
    listOperations.leftPush("reactiveList", "world2").subscribe();
    //3、对输出的 mono 使用 subscribe()
    Mono<Long> mono = listOperations.leftPush("reactiveList", "yinyu3");
    mono.subscribe(System.out::println);
}

数据库界面 👇,可以看到未调用 subscribe() 的步骤未执行,这就是“所有的操作只有在订阅的那一刻才开始进行!!”的含义,同时用 block() 代替也可实现。

Ⅱ 查询操作

Flux<String> 中 Flus 的作用相当于 List ,接触封装后打印出类似 List<String> 的形式

@Test
public void reactiveOpsForListTest2() {
    Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
    flux.subscribe(System.out::println);
}

查询成功 👇

⑥ opsForHash+ Flux<Object>

给每个新增的步骤加上 subscribe() ,需要注意的的是查询返回的 Mono 包装的是 Object

@Test
public void reactiveOpsForHashTest(){
    //1、reactiveOpsForHash 新增操作
    ReactiveHashOperations<String, String, String> hashOperations = reactiveStringRedisTemplate.opsForHash();
    hashOperations.put("Reactivekey", "hashkey1", "hello").subscribe();
    hashOperations.put("Reactivekey", "hashkey2", "world").subscribe();
    hashOperations.put("Reactivekey", "hashkey3", "java").subscribe();
    //2、reactiveOpsForHash 查询操作
    Mono<Object> mono2 = reactiveStringRedisTemplate.opsForHash().get("Reactivekey","hashkey2");
    mono2.subscribe(System.out::println);
}

新增成功 👇

 查询成功 👇

⑦ get + Flux<String> + buffer()

flux 经过 buffer 方法,转换成 list 传递给订阅者,buffer(1)表述1个元素成1个list,简单来说是对每个元素进行了列表封装。

@Test
public void bufferTest(){
    //opsForList() 查询
    Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
    flux.buffer(1).subscribe(System.out::println);
}

查询成功 👇

每经过一段时间,传递给订阅者一次数据,用到 Duration.ofSeconds(1)(1秒钟的延迟)

@Test
public void bufferTest(){
    //opsForList() 查询
    Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
    flux.buffer(Duration.ofSeconds(1)).subscribe(System.out::println);
}

⑧ get + Flux<String> + cache()

它缓存 Flux/Mono 前面步骤的结果,直到调用 cache() 方法为止

@Test
@SneakyThrows
public void cacheTest1(){
    Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
    var cached = flux.cache(Duration.ofSeconds(2));
    cached.subscribe(System.out::println);
}

⑨  map()、filter()、take()等操作符

本文举几个相对常用的操作符,详情的话还请看我之前写的这篇文章:【Java8:Stream流详解】

Ⅰmap()

接受一个函数作为参数,这个函数会被应用到每个元素上,并将其映射成一个新的元素。

@Test
public void mapTest(){
    //查询
    Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
    //map--对 Flux 里的每个元素加上 " Good",然后输出
    flux.map(e -> e + " Good").subscribe(System.out::println);
    //map--对 buffer(1) 后的每个列表元素里的 String 加上 " Great",然后输出每个列表元素里的 String
    flux.buffer(1).map(List->List.stream().map(e-> e + " Great")).subscribe(e->e.forEach(System.out::println));
}

操作成功 👇

Ⅱ filter()

过滤出符合条件的记录~

@Test
public void filterTest(){
    //查询
    Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
    flux.filter(e->e.equals("yinyu")).subscribe(System.out::println);
}

过滤成功 👇

Ⅲ take()

指定发送事件个数,以下为指定第一个事件 👇

@Test
public void takeTest(){
    //查询
    Flux<String> flux = reactiveStringRedisTemplate.opsForList().range("list",0,2);// 取 key 值为 list 的索引0到索引2的list
    flux.take(1).subscribe(System.out::println);
}

指定成功 👇

2.3 关于 ReactiveCrudRepository

其实,Spring Data 也为 Reactive 形式访问数据库提供了支持,类似 CrudRepository ,只是多了 Reactor 响应式框架的内容,至于对数据流(Mono、Flux等)的操作参考前文即可,但可惜的是该形式不被 Redis 支持 👇,不过 MongoDB是可以用 ReactiveCrudRepository 的

 


参考文章

Flux、Mono、Reactor 实战(史上最全)_架构师-尼恩的博客-CSDN博客

Reactive的方式访问Redis - 腾讯云开发者社区-腾讯云 (tencent.com)

flux 中的 buffer 的原理__lrs的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/30114.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【论文阅读】社交网络传播最大化问题-03

Leader-aware community detection in complex networksLeader-aware community detection algorithm - 领导感知社区检测算法创新点相关工作概念定义基础概念创新概念1. &#xff08;领导力&#xff09;2. &#xff08;边缘紧性&#xff09;3.&#xff08;引力&#xff09;模型…

【javaEE】网络原理(传输层Part1)

努力经营当下&#xff0c;直至未来明朗&#xff01; 文章目录前言传输层1. 介绍UDP协议2.【TCP】&#xff08;重难考点&#xff09;TCP可靠传输的机制1. 确认应答2. 超时重传3. 连接管理&#xff08;三次握手、四次挥手&#xff09;【面试题&#xff01;&#xff01;】THINK前言…

项目记录:使用SpringBoot + MyBatisPlus 在MySQL字段设置外键后ID自增失效导致添加失败问题(ID生成策略)

目录 说明 外键列设置后自增失效特性演示 ID不设置自增策略&#xff0c;报错问题和解决 设置自增策略冲突问题和解决。 说明 记录在使用SpringBoot MyBatisPlus操作数据库以及和前端页面交互时遇到的问题和解决方式。 1.表主键字段设置外键之后&#xff0c;自增功能失效…

Spring Security使用JSON格式登录

本文内容来自王松老师的《深入浅出Spring Security》&#xff0c;自己在学习的时候为了加深理解顺手抄录的&#xff0c;有时候还会写一些自己的想法。 Spring Security中默认的登录参数传递的格式是key/value形式&#xff0c;也是表单登录格式。在实际项目中我们可能会通过Json…

小米蓝牙耳机怎么选?适合小米手机的蓝牙耳机推荐

小米可以说是数码界的一股清流&#xff0c;在手机价格上做出了巨大的贡献&#xff0c;它的产品已经覆盖了我们的生活&#xff0c;包括智能家居、穿戴设备、通讯等等&#xff0c;蓝牙耳机作为出行必备的蓝牙耳机单品&#xff0c;耳机品牌众多&#xff0c;意味着我们有更多的选择…

带你深入了解什么是 Java 线程池技术

我们在程序开发中为了“压榨”计算机的 CPU 资源&#xff0c;会去使用多线程来提高程序的性能&#xff0c;在高并发的场景下&#xff0c;多线程编程显得尤为重要。而在线上&#xff0c;我们使用多线程大部分都是通过线程池来管理。线程池是一种基于池化思想的线程管理工具&…

服务器优化

文章目录服务器负载分析CPU 使用率内存使用率磁盘 I/O平均负载网络使用情况服务器内核参数调优单个进程最大打开文件数TCP 相关设置服务器负载分析 在性能调优时&#xff0c;需要先对服务器负载进行分析&#xff0c;通常而言&#xff0c;我们主要分析 CPU 使用率、内存使用率、…

android——自定义加载按钮LoadingButton

方式一 效果图&#xff1a; simpleButton类代码&#xff1a; package com.oneway.demo.navcontroller.view;import android.animation.ObjectAnimator; import android.animation.ValueAnimator; import android.annotation.SuppressLint; import android.content.Context; i…

基于TCP的DNS传输:操作要求

本文档更新了RFC 1123和RFC 1536。本文档要求将允许DNS消息在Internet上通过TCP传输的操作实践作为当前最佳实践。此操作要求与RFC 7766中的实施要求一致。TCP的使用包括基于未加密TCP的DNS以及加密的TLS会话。该文件还考虑了这种形式的DNS通信的后果&#xff0c;以及在不支持当…

腾讯T3整理分享的LeetCode算法小抄完整文档

前言 本文⽬前可以⼿把⼿带你解决 110 道 LeetCode 算法问题&#xff0c;⽽且在不断更新&#xff0c;全部基于 LeetCode 的题⽬&#xff0c;涵盖了所有题型和技巧。 目录 主要内容 ⽬前已包含的 114 道题⽬教程如下&#xff1a; 1.两数之和 10.正则表达式匹配 100.相同的树 …

vue中的transition学习

transition 会在一个元素或组件进入和离开 DOM 时应用动画。他可以将进入和离开的动画应用通过默认插槽传递给它的元素或者组件上 transitionGroup 会在一个 v-for 列表中的元素或组件被插入&#xff0c;移动&#xff0c;或移除时应用动画 <Transition> 组件 进入或者…

[附源码]java毕业设计研究生管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

GoLand 文件增加头部注释

一 背景 为了统一规范,决定在项目中要增加注释,包括文件的头部注释以及函数方法的注释。函数方法的注释好说,文件头部注释这个搞了半天(搞完后才发现真的很容易),记录下自己搞得过程,方便其他人。 二 设置头部注释的步骤 我的系统环境是Macos,这篇文章针对的Mac电…

mitmproxy

我们经常了解到的抓包工具有wireshark、fiddler、charles等&#xff0c;mitmproxy也是一个代理工具&#xff0c;突出的优点是可以命令行方式或脚本的方式进行代理&#xff0c;可以对请求数据进行二次开发&#xff08;二次定制&#xff09; 官网&#xff1a;https://mitmproxy.o…

10 张图解 K8S CNI Calico 网络模型原理与功能实战

一、概述 Calico 是一个联网和网络策略供应商。Calico 支持一套灵活的网络选项&#xff0c;因此你可以根据自己的情况选择最有效的选项&#xff0c;包括非覆盖和覆盖网络&#xff0c;带或不带 BGP。Calico 使用相同的引擎为主机、Pod 和&#xff08;如果使用 Istio 和 Envoy&am…

Nginx:配置

文章目录1、Nginx 工作原理2、Nginx 安装启动2.1、安装2.2、启动3、配置文件3.1、块配置3.2、代理 & 负载均衡3.2.1、代理正向代理反向代理3.2.2、负载均衡3.3、Nginx 缓存3.4、Nginx 限流4、http 配置使用4.1、配置结构4.2、配置命令4.2.1、设置配置命令4.2.2、设置回调方…

你也还在找程序员外包平台吗?有这几个就足够了!

大家都知道&#xff0c;如果程序员想在工作之余赚一点外快的话&#xff0c;接外包是所有兼职赚钱之中来钱比较快的一种。但是要找到一些比较靠谱的&#xff0c;能够经常使用的接外包平台&#xff0c;似乎是一件费时又费力的事情。 接下来就为大家推荐几个比较好的程序员接外包的…

Python_数据容器_列表list

一、数据容器入门 使用场景&#xff1a;批量存储、批量使用多份数据 Python中的数据容器&#xff1a; 一种可以容纳多份数据的数据类型&#xff0c;容纳的每一份数据称之为一个元素。每一个元素&#xff0c;可以是任意类型的数据&#xff0c;如字符串、数字、布尔等。 总结&…

Windows下的RabbitMQ 安装

1.到rabbitmq官网下载安装程序 Messaging that just works — RabbitMQ 1.1 我选择的事RabbitMQ 3.11.3 1.2 点击链接后进入下面的界面 1.3 继续点击RabbitMQ 3.11.3 release 链接 Release RabbitMQ 3.11.3 rabbitmq/rabbitmq-server GitHub 1.4 在页面最下面可以看到下…

葡萄糖-聚乙二醇-四嗪/叶酸/多巴胺 Glucose-PEG-TZ/FA/Dopamine

葡萄糖-聚乙二醇-四嗪/叶酸/多巴胺 Glucose-PEG-TZ/FA/Dopamine 叶酸是一种水溶性维生素&#xff0c;分子式是C19H19N7O6。因绿叶中含量十分丰富而得名&#xff0c;又名蝶酰谷氨酸。在自然界中有几种存在形式&#xff0c;其母体化合物是由蝶啶、对氨基苯甲酸和谷氨酸3种成分结…