【深入解析spring cloud gateway】08 Reactor 知识扫盲

news2025/1/14 1:07:58

一、响应式编程概述

1.1 背景知识

为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。
在2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor,也进入了里程碑式的3.1.0 版本。

1.2 什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
响应式编程基于reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。
电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。
响应式传播核心特点之一:变化传播:一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。

1.3 基于 Reactor 实现

Reactor 是一个运行在 Java8 之上满足 Reactice 规范的响应式框架,它提供了一组响应式风格的 API。
Reactor 有两个核心类: Flux 和 Mono,这两个类都实现 Publisher 接口。
Flux 类似 RxJava 的 Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。
Mono 最多只触发一个事件,所以可以把 Mono 用于在异步任务完成时发出通知。
Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号;错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
三种信号的特点:
错误信号和完成信号都是终止信号,不能共存
如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
如果没有错误信号,也没有完成信号,表示是无限数据流

Mono 原理图如下:
在这里插入图片描述

Flux原理图如下:
在这里插入图片描述

结合上面两个图,发现Mono和Flux非常相似。只是Mono只接收一个元素,而Flux接收多个元素

二、示例代码

2.1 Mono

package com.reactor.demo;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
public class MonoTest {
    @Test
    public void test1() {
        //just用法
        Mono.just("hello world").subscribe(System.out::println);
        //runnable创建mono
        Mono<Void> sinkMono = Mono.fromRunnable(() -> System.out.println("runnable"));
        //这句不会输出
        sinkMono.doOnNext(unused -> System.out.println("void success"));
        //这句也不会输出
        sinkMono.subscribe(o -> System.out.println("void result" + o));

        //创建一个不包含任何元素,只发布结束消息的序列。,这里的hello empty是不会输出的。
        Mono.empty()
                //输出“empty的入参是null”
                .doOnSuccess(o -> System.out.println("empty的入参是" + o))
                //这句不会输出
                .subscribe(o -> System.out.println("hello empty"));
        //empty里面至少还有一个结束消息,而never则是真的啥都没有。"never的入参是"不会输出 ,这里的hello never也不会输出
        Mono.never().doOnSuccess(o -> System.out.println("never的入参是" + o)).subscribe(o -> System.out.println("hello never"));
    }

    @Test
    public void test2() {
        //传入supplier
        Mono.fromSupplier(() -> "Hello supplier").subscribe(System.out::println);
        //传入optional
        Mono.justOrEmpty(Optional.of("Hello optional")).subscribe(System.out::println);
        //通过sink来创建一个正常执行的Mono
        Mono.create(sink -> sink.success("Hello sink")).subscribe(System.out::println);
        //通过sink来创建一个抛出异常的Mono
        Mono.create(sink -> sink.error(new RuntimeException("sink error"))).subscribe(System.out::println);
        //defer的入参实际上是一个Mono工厂
        Mono.defer(() -> Mono.just("hello defer")).subscribe(System.out::println);
    }

    @Test
    public void test3() {
        //callable,有返回值
        Mono.fromCallable(() -> "callable").subscribe(System.out::println);
        //runnable无返回值
        Mono<Void> mono = Mono.fromRunnable(() -> System.out.println("run"));
        //下面的hello runnable是不会输出的。因为subscribe一个Mono<Void>,不会产生任何结果
        mono.subscribe(o -> System.out.println("hello runnable"));
    }

    @Test
    public void test4() {
        //延迟3秒输出
        Mono.delay(Duration.ofSeconds(3)).doOnNext(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) {
                System.out.println(aLong);
            }
        }).block();

    }

    @Test
    public void test5() {
        //直接输出了异常
        Mono.error(new RuntimeException("这是一个异常")).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                System.out.println("error:" + o);
            }
        });

        Mono.defer(() -> {
            return Mono.error(new RuntimeException("这是第二个异常"));
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                System.out.println("defer error:" + o);
            }
        });
    }

    @Test
    public void test6() {
        //通过map可以对元素进行转换
        Mono.just("just one").map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) {
                return 1;
            }
        }).doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                System.out.println("转换后的结果:" + integer);
            }
        }).subscribe();
    }
}

2.1 Flux

package com.reactor.demo;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.function.Consumer;

public class FluxTest {

    /**
     * 基本用法
     */
    @Test
    public void test1() {
        //通过just传入可变的参数,依次输出
        Flux.just("hello", "world", "just").doOnNext(System.out::println)
                .doOnComplete(() -> System.out.println("just over")).subscribe();
        //传入一个范围
        Flux.range(100, 10)
                .doOnNext(System.out::println).doOnComplete(() -> System.out.println("OK")).subscribe();
        //传入list
        Flux.fromIterable(Arrays.asList("01", "02", "03")).doOnNext(System.out::println).subscribe();
        //传入一个数组
        Flux.fromArray(new Object[]{"obj1", "obj2"}).doOnNext(System.out::println).subscribe();
    }


    /**
     * 处理空值
     */
    @Test
    public void testEmpty() {
        //如果序列是个空的,就给个默认值
        Flux.empty().defaultIfEmpty(1).doOnNext(System.out::println).subscribe();
        //如果序列是空的,就用新序列代替
        Flux.empty().switchIfEmpty(Mono.just("100")).doOnNext(System.out::println).subscribe();
    }


    /**
     * 序列在执行时的一些监听方法doOnXXXX
     */
    @Test
    public void testDoOn() {
        System.out.println("----------");
        Flux.range(100, 10)
                .doOnNext(System.out::println).doOnComplete(() -> System.out.println("OK"));

        System.out.println("----------");
        Flux.range(100, 10).doFirst(() -> System.out.println("第一个执行开始")).subscribe();

        System.out.println("----------");
        Flux.range(100, 10).doFinally(it -> System.out.println("终止信号的类型为" + it.name())).subscribe();

        System.out.println("----------");
        Flux.range(100, 10).doOnSubscribe(it -> System.out.println("该序列已被订阅")).subscribe();

        System.out.println("----------");

        Flux.range(100, 10).doOnRequest(value -> System.out.println("doOnRequest:" + value)).subscribe();

        //在完成或者error时,也就是序列终止时执行runnable
        System.out.println("----------");
        Flux.range(100, 10).doOnTerminate(() -> System.out.println("doOnTerminate")).subscribe();

        //doOnEach每次向下游传播,都会得到一个信号类型,可以根据该信号类型执行一些操作
        System.out.println("----------");
        Flux.range(100, 10).doOnEach(it -> System.out.println("doOnEach:" + it)).subscribe();
    }


    /**
     * filter用法
     */
    @Test
    public void testFilter() {

        System.out.println("----------");
        //将上游的数据进行类型判断,符合该类型的数据将流向下游
        Flux.just(new Object(), "Hello", 1)
                .ofType(String.class).doOnNext(System.out::println)
                .doOnComplete(() -> System.out.println("过滤String示例")).subscribe();

        System.out.println("----------");
        //过滤数据
        Flux.range(100, 10)
                .filter(it -> it > 105)
                .doOnComplete(() -> System.out.println("取出大于105示例")).subscribe();

        System.out.println("----------");
        //将重复数据过滤,重复数据在整个序列中只保留一个
        Flux.range(100, 10)
                .concatWith(Flux.just(100, 100, 100))
                .distinct().doOnNext(System.out::println)
                .doOnComplete(() -> System.out.println("去除重复数字示例")).subscribe();

        System.out.println("----------");

        //将后来的重复数据过滤,如下,第二个flux拼接到第一个序列时,只会把第二个元素本身的重复元素过滤
        Flux.range(100, 10)
                .concatWith(Flux.just(100, 100, 100))
                .distinctUntilChanged().doOnNext(System.out::println)
                .doOnComplete(() -> System.out.println("将后来的重复数据过滤")).subscribe();

        System.out.println("----------");
        //在序列的开始获取5个元素,
        // limitRequest为true时,则不管该序列会发射多少元素,该参数会向上传递背压,则上游序列只会发出设定的5个元素
        //为false时,则不控制上有元素可以发出N个元素
        Flux.range(100, 10).take(5, false)
                .doOnComplete(() -> System.out.println("在序列的开始获取5个元素")).subscribe();

        System.out.println("----------");
        //参数为时间单位,意味着take获取元素,只会在该时间限制内获取。
        Flux.range(100, 10).take(Duration.ofSeconds(10))
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) {
                        System.out.println("当前时间戳为:" + System.currentTimeMillis() + ",数字为:" + integer);
                    }
                })
                .doOnComplete(() -> System.out.println("在指定时间内获取元素"))
                .subscribe(System.out::println);

        System.out.println("----------");
        //获取最后的N位元素
        Flux.range(100, 10).takeLast(2)
                .doOnComplete(() -> System.out.println("获取最后的2位元素"))
                .subscribe(System.out::println);

        System.out.println("----------");
        //获取元素,知道符合条件后停止向下游发送数据,包括条件本身,也就是当it>105的元素也会被发布至下游
        Flux.range(100, 10).takeUntil(it -> it > 105)
                .doOnComplete(() -> System.out.println("一直取数,直到大于105结束"))
                .subscribe(System.out::println);

        System.out.println("----------");
        //获取元素,当元素符合该断言时,如果不符合直接终止,不包含条件本身
        Flux.range(100, 10).takeWhile(it -> it < 105)
                .doOnComplete(() -> System.out.println("取出小于105示例"))
                .subscribe(System.out::println);

        System.out.println("----------");
        //获取指定某个位置的一个元素
        Flux.range(100, 10).elementAt(0)
                .doOnSuccess(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer i) {
                        System.out.println("获取指定某个位置的一个元素:" + i);
                    }
                })
                .subscribe();

        System.out.println("----------");
        //获取最后一个元素,last()如果为空则抛出异常,last(1)如果为空则发出默认值
        Flux.range(100, 10)
                .takeWhile(it -> it > 105).last(1)
                .subscribe(System.out::println);

        System.out.println("----------");
        //跳至第几秒开始执行
        Flux.range(100, 10)
                .skip(Duration.ofSeconds(5)).subscribe(System.out::println);

        System.out.println("----------");
        //跳至第几个元素开始执行
        Flux.range(100, 10)
                .skip(5).subscribe(System.out::println);

        System.out.println("----------");
        //从开始跳到最后第N个元素结束
        Flux.range(100, 10).skipLast(5).subscribe(System.out::println);

        System.out.println("----------");
        //跳至满足条件的地方开始执行,从第一个元素开始,知道满足条件,开始发送至下游
        Flux.range(100, 10).skipUntil(it -> it > 105).subscribe(System.out::println);

        System.out.println("----------");
        //每隔一段时间抽取样本数(取在这个时间的最后一个元素),如果相隔实现大于序列的执行时间,则去最后一元素
        Flux.range(100, 100000000).sample(Duration.ofMillis(100)).subscribe(System.out::println);

        System.out.println("----------");
        //每隔一段时间抽取样本数(取在这个时间的第一个元素),如果相隔实现大于序列的执行时间,则取第一个元素
        Flux.range(100, 10).sampleFirst(Duration.ofMillis(100)).subscribe(System.out::println);

        System.out.println("----------");
        //只获取一个元素,single()如果为空或者超多一个,抛出异常,single(1)如果为空返回默认值,如果多个抛出异常,singleOrEmpty()可以允许为空
        Flux.range(100, 10).single(1).subscribe(System.out::println);
    }


    /**
     * 当被订阅后如果发生异常,则stream会停止运行
     * 此时可以通过处理error来决定如何处理异常
     * 可以将异常跳过、将异常替换等
     */
    @Test
    public void testErrorHandle() {
        System.out.println("----------");
        Flux.just(1, 2, 3, 0, 5, 4).map(it -> {
                    it = 100 / it;
                    return it;
                })
                //报错后返回,并停止运行
                .onErrorResume(e -> {
                    return Mono.just(10000);
                })
                .doFinally(type -> {
                    System.out.println(type);
                })
                .subscribe(System.out::println);

        System.out.println("----------");
        Flux.just(1, 2, 3).doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                System.out.println(integer);
                if (integer == 2) {
                    throw new RuntimeException("触发异常");
                }
            }
        }).doOnError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                System.out.println("doOnError:" + throwable.getMessage());
            }
        }).subscribe();


        System.out.println("----------");
        Flux.just(1, 2, 3, 0, 5, 4).map(it -> {
                    it = 100 / it;
                    return it;
                })
                //报错后继续运行,并执行相关操作
                .onErrorContinue((e, it) -> {
                    System.out.println(e.getMessage());
                })
                .doFinally(type -> {
                    System.out.println(type);
                })
                .subscribe(System.out::println);
    }

    @Test
    public void flatMapTest() {
        //输出50,100
        Flux.just(5, 10).flatMap(x -> Flux.just(x * 10)).toStream().forEach(System.out::println);


    }
}

参考文章

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/980997.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python实现猎人猎物优化算法(HPO)优化循环神经网络回归模型(LSTM回归算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 猎人猎物优化搜索算法(Hunter–prey optimizer, HPO)是由Naruei& Keynia于2022年提出的一种最新的…

[machine learning]误差分析,模型分析

1.目的是什么 当我们找到一个算法去计算某些东西的时候,我们通常要对这个算法进行一定的分析,比如时间复杂度,空间复杂度(前者更加重要),来进行比较,判断一个算法的优劣性. 对于一个训练的模型来说,同样需要某种模型来进行分析,例如代价函数等等,通过比较拟合程度,正确精度等…

如何安装ideaIU-2020.3.3并使用无限重置插件达到永久使用

第一步、双击安装idea程序 第二步、选中64位与更新环境变量 第三步、安装完成后双击打开 1、选中 Evaluate for free – Evaluate 进入试用期 第四步、打开后选择Plugins – 设置 1、安装来自磁盘 Install Plugin from Disk 第五步、选中我们的无限重置插件&#xff0c;安装 …

Golang RSA 生成密钥、加密、解密、签名与验签

文章目录 1.RSA2.Golang 实现 RSA生成密钥加密解密签名验签 3.dablelv/cyan参考文献 1.RSA RSA 是最常用的非对称加密算法&#xff0c;由 Ron Rivest、Adi Shamir、Leonard Adleman 于1977 年在麻省理工学院工作时提出&#xff0c;RSA 是三者姓氏首字母的拼接。 它的基本原理…

Vue2基础速通

文章目录 Vue基础速通前言1、Vue概述2、快速入门3、模板语法4、数据绑定5、el和data的两种写法6、数据代理7、事件处理7.1 快速入门7.2 事件修饰7.3 键盘事件 8、计算属性9、监视属性9.2 快速入门9.2 深度监视9.3 知识拓展 10、动态绑定样式11、条件渲染12、列表渲染13、key的作…

【算法训练-链表 三】【判断】判断链表中是否有环、判断链表是否为回文链表

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【链表的相关判断】&#xff0c;使用【链表】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为…

【HTML5高级第二篇】WebWorker多线程、EventSource事件推送、History历史操作

文章目录 一、多线程1.1 概述1.2 体会多线程1.3 多线程中数据传递和接收 二、事件推送2.1 概述2.2 onmessage 事件 三、history 一、多线程 1.1 概述 前端JS默认按照单线程去执行&#xff0c;一段时间内只能执行一件事情。举个栗子&#xff1a;比方说古代攻城游戏&#xff0c…

animate.css与vue中的v-if/v-show如何一起使用

第一步:在已有的vue项目中安装animate.css npm install animate.css --save第二步&#xff1a;在 main.js 引入 import animate.css第三步&#xff1a;如果在vue中使用了v-if 或者v-show 的话就不能直接在元素上加入animate的class。我们应该在v-if/v-show的元素外层添加tran…

微服务整合Seata1.5.2+Nacos2.2.1+SpringBoot

文章目录 一、下载seata server二、客户端配置-application.yml三、初始Mysql数据库四、导入初始配置到nacos五、启动测试 本文以seata-server-1.5.2&#xff0c;以配置中心、注册中心使用Nacos&#xff0c;store.modedb&#xff08;mysql&#xff09;为例进行操作。 Seata简介…

vue-cli3项目本地启用https,并用mkcert生成证书

在项目根目录下的vue.config.js文件中&#xff1a; // vue.config.js module.exports {devServer: {host:dev.nm.cngc// 此处开启 https,并加载本地证书&#xff08;否则浏览器左上角会提示不安全&#xff09;https: {cert: fs.readFileSync(path.join(_dirname,./cert.crt)…

Linux查端口占用的几种方式

在Linux中&#xff0c;你可以使用以下几种方式来查看端口的占用情况。 一、使用netstat命令 #安装netstat yum -y install net-tools #检测端口占用 netstat -npl | grep 端口# 几种常规用法 netstat -ntlp //查看当前所有tcp端口 netstat -ntulp | grep 80 //查看所有80端…

科技成果鉴定测试报告一般包含哪些测试内容?

软件测评报告 一、科技成果评价是需要做第三方软件测评报告&#xff0c;一般是证明技术指标点是否完善&#xff0c;覆盖主要申报内容&#xff0c;应用软件项目科技成果鉴定测试内容&#xff1a; &#xff08;一&#xff09;是否完成合同或计划任务书要求的指标&#xff1b; …

Chaes恶意软件的新Python变种以银行和物流业为目标

银行和物流行业正遭受一种名为 "Chaes "恶意软件变种的攻击。 Morphisec 在与《黑客新闻》分享的一份新的详细技术报告中说&#xff1a;“Chaes”经历了重大的改版&#xff0c;从完全用 Python 重写&#xff0c;到整体重新设计和增强通信协议&#xff0c;导致传统防…

upload-labs1-21关文件上传通关手册

upload-labs文件上传漏洞靶场 目录 upload-labs文件上传漏洞靶场第一关pass-01&#xff1a;第二关Pass-02第三关pass-03&#xff1a;第四关pass-04&#xff1a;第五关pass-05&#xff1a;第六关pass-06&#xff1a;第七关Pass-07第八关Pass-08第九关Pass-09第十关Pass-10第十一…

TikTok运营秘籍:助力品牌增长

在数字化时代&#xff0c;TikTok已成为品牌推广的热门渠道。本文将分享关于TikTok运营的关键策略和技巧&#xff0c;助您快速引爆品牌增长。 一.了解TikTok特点与用户群体 理解TikTok平台的特点和用户群体是成功运营的基础。TikTok以短视频为主&#xff0c;追求创意、趣味和娱…

怎么做手机App测试?app测试详细流程和方法介绍

APP测试 1、手机APP测试怎么做&#xff1f; 手机APP测试&#xff0c;主要针对的是android和ios两大主流操作系统&#xff0c;主要考虑的就是功能性、兼容性、稳定性、易用性&#xff08;也就是人机交互&#xff09;、性能。 手机APP测试前的准备&#xff1a; 1.使用同类型的…

【AI】机器学习——朴素贝叶斯

文章目录 2.1 贝叶斯定理2.1.1 贝叶斯公式推导条件概率变式 贝叶斯公式 2.1.2 贝叶斯定理2.1.3 贝叶斯决策基本思想 2.2 朴素贝叶斯2.2.1 朴素贝叶斯分类器思想2.2.2 条件独立性对似然概率计算的影响2.2.3 基本方法2.2.4 模型后验概率最大化损失函数期望风险最小化策略 2.2.5 朴…

网络技术六:TCP/UDP原理

TCP/UDP原理 命令行操作基础 命令类型 常见设备管理命令 H3C路由交换产品连接方法 使用console线本地连接 协议Serial&#xff0c;接口com口&#xff0c;波特率9600 适用于设备的初次调试 使用Telnet远程访问 适用于设备上架配置好后的维护管理 使用SSH远程访问 数据传输…

【Hive SQL 每日一题】统计用户连续下单的日期区间

文章目录 测试数据需求说明需求实现 测试数据 create table test(user_id string,order_date string);INSERT INTO test(user_id, order_date) VALUES(101, 2021-09-21),(101, 2021-09-22),(101, 2021-09-23),(101, 2021-09-27),(101, 2021-09-28),(101, 2021-09-29),(101, 20…

【Java SE】抽象类与接口

目录 【1】抽象类 【1.1】抽象类概念 【1.2】抽象类语法 【1.3】抽象类特性 【1.4】抽象类的作用 【2】接口 【2.1】接口的概念 【2.2】语法规则 【2.3】接口使用 【2.4】接口特性 【2.5】实现多个接口 【2.6】接口间的继承 【2.7】接口使用实例 【2.8】Clonable …