Reactor手册

news2025/1/11 21:35:51

Flux

Flux 是一个发出0-N个元素组成的异步序列的Publisher,可以被onComplete信号或者onError信号所终止。

Flux.just("Hello", "World").subscribe(System.out::println);

// fromArray(),fromIterable(),fromStream()

Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);

Flux.empty().subscribe(System.out::println);

Flux.range(1, 10).subscribe(System.out::println);

Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);

Flux.intervalMillis(1000).subscribe(System.out::println);

flux1.firstEmittingWith(flux2); // 谁先有数据就用谁,跟mono的or差不多
// 同步生成元素

Flux.generate(sink -> {

    sink.next("Hello");

    sink.complete();

}).subscribe(System.out::println);



final Random random = new Random();

Flux.generate(ArrayList::new, (list, sink) -> {

    int value = random.nextInt(100);

    list.add(value);

    sink.next(value);

    if (list.size() == 10) {

        sink.complete();

    }

    return list;

}).subscribe(System.out::println);



// 可异步可同步

Flux.create(sink -> {

    for (int i = 0; i < 10; i++) {

        sink.next(i);

    }

    sink.complete();

}).subscribe(System.out::println);



// sink的背压

sink.onRequest(n -> channel.poll(n))

        .onCancel(() -> channel.cancel())

        .onDispose(() -> channel.close()) 

Mono

Mono 是一个发出0-1个元素的Publisher,可以被onComplete信号或者onError信号所终止。

Mono.just(T) // 饥渴的。Mono. defer或fromSupplier 等是懒加载的

Mono.empty() / Mono.justOrEmpty(Optional<T>/T)

flux.then(); // 返回一个mono。是下一步的意思,但它只表示执行顺序的下一步,不依赖于上一步的结果。then() 方法的参数只是一个 Mono,无从接受上一步的执行结果。而 flatMap() 和 map() 的参数都是一个 Function,入参是上一步的执行结果。

Mono.never()

fromCallable(),fromCompletionStage(),fromFuture(),fromRunnable()和fromSupplier() 分别从Callable,CompletionStage,CompletableFuture,Runnable和Supplier中创建Mono

ignoreElements(Publisher source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);

Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

Mono.defer(()->{…});

mono1.or(mono2); // 谁先emit第一个可用的signal就用谁
// 传统的命令式编程

Object result1 = doStep1(params);

Object result2 = doStep2(result1);

Object result3 = doStep3(result2);

// 对应的反应式编程

Mono.just(params)

    .flatMap(v -> doStep1(v))

    .flatMap(v -> doStep2(v))

    .flatMap(v -> doStep3(v));

逻辑判断

filter

对流中包含的元素进行过滤,只留下满足Predicate指定条件的元素

Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);

any/all/ hasElements/ hasElement/ ofType

Flux.range(1, 6).any(f -> f < 0).subscribe(t -> System.out.println("any逻辑操作:"+t));

Flux.range(1, 6).all(f -> f > 0).subscribe(t -> System.out.println("all逻辑操作:"+t));

take/skip

take系列操作符用来从当前流中提取元素。Skip跳过元素

// 第一行语句输出的是数字 1 到 10;

// 第二行语句输出的是数字 991 到 1000;

// 第三行语句输出的是数字 1 到 9;

// 第四行语句输出的是数字 1 到 10,使得 Predicate 返回 true 的元素也是包含在内的

Flux.range(1, 1000).take(10).subscribe(System.out::println);

Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);

Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);

Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

Flux.range(0, 10).skip(5).subscribe(f -> System.out.println("剩下的元素:"+f));

操作符

mergeeagerly,会提前准备好数据,更快,但更消耗内存)

mergemergeSequential操作符用来把多个流合并成一个Flux序列. merge按照所有流中元素的实际产生序列来合并,而mergeSequential按照所有流被订阅的顺序

//进行合并的流都是每隔 100 毫秒产生一个元素,不过第二个流中的每个元素的产生都比第一个流要延迟 50 毫秒。在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起;而使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素

Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);

Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);

concatMap (lazy;按顺序串行生成元素不能并行,内存消耗更小,默认Sequential排好序的)

flatMapeagerly;会提前准备好数据,更快,但更消耗内存,flat能并行生成元素)

flatMapflatMapSequential操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。Sequential结果会跟原始元素顺序一致

startWith/ concatWith

在开头添加:Flux#startWith(T...)

在最后添加:Flux#concatWith(T...)

handle

类似map 与 filter 的组合

Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)

    .handle((i, sink) -> {

        String letter = alphabet(i);

        if (letter != null)

            sink.next(letter);

    });

alphabet.subscribe(System.out::println);

zip(能配对多个)zip(a,b,c..)

zipWith只配对两个a.zipWith(b)

zipWith操作符把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为Tuple2的流;也可以通过一个BiFunction函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值

Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);

Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);

combineLatest

把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素

// 流中最新产生的元素会被收集到一个数组中,通过 Arrays.toString 方法来把数组转换成 String

Flux.combineLatest(

        Arrays::toString,

        Flux.intervalMillis(100).take(5),

        Flux.intervalMillis(50, 100).take(5)

).toStream().forEach(System.out::println);

reduce

对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

Flux.range(1,100).reduce((x, y) -> x + y).subscribe(System.out::println);

Flux.range(1,100).reduceWith(()->100,(x,y)->x + y).subscribe(System.out::println);

distinct

Flux.just(1, 2, 2, 3, 3, 4, 5, 5).distinct().subscribe(f -> System.out.println("去重后的元素:"+f));

transform/ compose

可以将一段操作链封装为一个函数式function。相当于别名和宏.

Function<Flux<String>, Flux<String>> filterAndMap =

f -> f.filter(color -> !color.equals("orange"))

      .map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))

        .doOnNext(System.out::println)

        .transform(filterAndMap)

        .subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));

switchIfEmpty在源为空的情况下,替换成另一个流源

defaultIfEmpty当调用者执行完且没有任何数据返回时,返回默认值

then

…​并且我希望用 Mono 来表示序列已经结束:then

…​并且我想在序列结束后等待另一个任务完成:thenEmpty

…​并且我想在序列结束之后返回一个 Mono:Mono#then(mono)

…​并且我想在序列结束之后返回一个值:Mono#thenReturn(T)

…​并且我想在序列结束之后返回一个 Flux:thenMany

and

前后两个流都等待结束,并返回一个空mono

when

当参数的流都完成时返回空mono

区间/集合

buffer/bufferTimeout

public final Flux<List<T>> buffer(int maxSize)

// 第一行语句输出的是 5 个包含 20 个元素的数组;

// 第二行语句输出的是 2 个包含了 10 个元素的数组;

// 第三行语句输出的是 5 个包含 2 个元素的数组。每当遇到一个偶数就会结束当前的收集;

// 第四行语句输出的是 5 个包含 1 个元素的数组,数组里面包含的只有偶数。

Flux.range(1, 100).buffer(20).subscribe(System.out::println);

Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);

Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

window

返回值Flux<Flux>

// 两行语句的输出结果分别是 5 个和 2 个字符

Flux.range(1, 100).window(20).subscribe(System.out::println);

Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

collectList/ collectMap

用于将含有多个元素的Flux转换为含有一个元素列表的Mono

Flux.range(1, 6).collectList().subscribe(f -> System.out.println("组成list后的元素:"+f));

Context

类似于 ThreadLocal, 一个 Context 是绑定到每一个链中的 Subscriber 上的. 它的内容只能被subscriberContext上游的操作符看到

String key = "message";

Mono<String> r = Mono.just("Hello")

                .flatMap( s -> Mono.subscriberContext().map( ctx -> s + " " + ctx.get(key)))

                .subscriberContext(ctx -> ctx.put(key, "World"));

StepVerifier.create(r).expectNext("Hello World").verifyComplete();

消息处理

subscribe订阅

当没有订阅时发布者什么也不做。

发出元素:doOnNext

序列完成:Flux#doOnComplete,Mono#doOnSuccess

因错误终止:doOnError

取消:doOnCancel

订阅时:doOnSubscribe

请求时:doOnRequest

完成或错误终止:doOnTerminate(Mono的方法可能包含有结果)

但是在终止信号向下游传递 之后 :doAfterTerminate

所有类型的信号(Signal):Flux#doOnEach

所有结束的情况(完成complete、错误error、取消cancel):doFinally

subscribe();

subscribe(Consumer<? super T> consumer);

subscribe(Consumer<? super T> consumer,

          Consumer<? super Throwable> errorConsumer);

subscribe(Consumer<? super T> consumer,

          Consumer<? super Throwable> errorConsumer,

          Runnable completeConsumer);

subscribe(Consumer<? super T> consumer,

          Consumer<? super Throwable> errorConsumer,

          Runnable completeConsumer,

          Consumer<? super Subscription> subscriptionConsumer);

继承BaseSubscriber,重写hookOnSubscribe、hookOnNext、hookOnError、hookOnCancel、 hookOnComplete、hookFinally

error处理

响应式流中的任何错误都是一个终止事件。 即使用了错误处理操作符,也不会让源头流序列继续。而是将 onError 信号转化为一个 新的 序列 的开始。换句话说,它代替了被终结的 上游 流序列。

Flux/Mono.error(Throwable error) 创建一个只包含错误消息的序列

Flux.just(1,2).concatWith(Mono.error(new IllegalStateException())).subscribe(System.out::println, System.err::println);

Flux.just(1,2).concatWith(Mono.error(new IllegalStateException())).onErrorReturn(0).subscribe(System.out::println);

Flux.just(1,2).concatWith(Mono.error(new IllegalStateException())).switchOnError(Mono.just(0)).subscribe(System.out::println);

Flux.just(1,2).concatWith(Mono.error(new IllegalArgumentException())).onErrorResume(e -> {

  if(e instanceof IllegalStateException)

    return Mono.just(0);

  else if(e instanceof IllegalArgumentException)

    return Mono.just(-1);

  return Mono.epmty();

}).subscribe(System,.out::println);




//不改变它的情况下做出响应(如记录日志),并让错误继续传递下去,

Flux.just("unknown").flatMap(k -> callExternalService(k))

    .doOnError(e -> {

        failureStat.increment();

        log("uh oh, falling back, service failed for key " + k);

    })

    .onErrorResume(e -> getFromCache(k));

retry

Flux.just(1,2).concatWith(Mono.error(new IllegalStateException())).retry(1).subscrible(System.out::println);

阻塞和非阻塞相互转化

// 非阻塞->阻塞

mono.block();

flux.toIterable();

Flux.toStream

Flux.blockLast

// 阻塞->非阻塞

Flux<Person> asyncPersonLookup(PersonRepository repository){

    return Flux.defer(() -> Flux.fromIterable(repository.findAll()))

.subscribeOn(Schedulers.elastic());

}

Mono<Void> asyncSavePersons(Flux<Person>flux,PersonRepository repository){

    return flux.publishOn(Schedulers.parallel())

               .doOnNext(repository::save)

               .then();

}

调度/线程

通过publishOn()和subscribeOn()方法可以切换执行操作调度器。publishOn()方法切换的是操作符的执行方式,而subscribeOn()方法切换的是产生流中元素时的执行方式。

publishOn会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)

Flux.create(sink -> {

  sink.next(Thread.currentThread().getName());

  sink.complete(); 

}).publishOn(Schedulers.single())

.map(x ->  String.format("[%s] %s", Thread.currentThread().getName(), x))

.publishOn(Schedulers.elastic())

.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

.subscribeOn(Schedulers.parallel())

.toStream()

.forEach(System.out::println);

Schedulers.newSingle()

Schedulers.newElastic(yourScheduleName)

测试

StepVerifier

StepVerifier的作用是可以对序列中包含的元素进行逐一验证。通过StepVerifier.create()方法对一个流进行包装之后再进行验证。expectNext()方法用来声明测试时所期待的流中的下一个元素的值,而verifyComplete()方法则验证流是否正常结束。verifyError()来验证流由于错误而终止。

StepVerifier.create(Flux.just(a,b))

.expectNext("a").expectNext("b").verifyComplete();

TestPublisher

这个类本质上是一个 Publisher<T>, 你可以通过可编程的方式来用它发出各种信号。

final TestPublisher<String> testPublisher = TestPublisher.creater();

testPublisher.next("a");

testPublisher.next("b");

testPublisher.complete();

StepVerifier.create(testPublisher)

    .expectNext("a")

    .expectNext("b")

    .expectComplete();

日志

Flux.range(1, 2).log("Range").subscribe(System.out::println);

调试

在调试模式启用之后,所有的操作符在执行时都会保存额外的与执行链相关的信息。当出现错误时,这些信息会被作为异常堆栈信息的一部分输出

Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());

Flux.just(1, 0).map(x -> 1/x).checkpoint("test").subscribe(System.out::println);

引用

GitHub & BitBucket HTML Preview

Flux、Mono、Reactor 实战(史上最全)_普通网友的博客-CSDN博客_mono.create

聊聊Spring Reactor反应式编程 - 知乎

使用 Reactor 进行反应式编程_Jaemon的博客-CSDN博客_reactor3 反应式

reactor Mono.defer_罗小爬EX的博客-CSDN博客_reactor defer

https://www.baeldung.com/reactor-combine-streams

Reactor 3 (11): 数据扁平处理flatMap、concatMap_泛泛之素的博客-CSDN博客

reactor merge vs concat、flatMap vs concatMap_罗小爬EX的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/83526.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

K8s 核心组件介绍

目录前言一、控制平面组件1.1 kube-apiserver1.2 etcd1.3 kube-scheduler1.4 kube-controller-manager1.5 cloud-controller-manager二、Node 组件2.1 kubelet2.2 kube-proxy2.3 Container Runtime前言 一个完整的 K8s 集群由一组节点&#xff08;node&#xff09;服务器组成&…

组队-蓝桥杯

题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 作为篮球队教练&#xff0c;你需要从以下名单中选出 11 号位至 55 号位各一名球员&#xff0c;组成球队的首发阵容。 每位球员担任 11 号位至 55 号位时的评分如下…

关于人脸检测和人脸关键点检测的详解(涉及Opencv 和Dlibd)

关于人脸识别&#xff0c;大家入门opencv&#xff0c;最常见的是用opencv级联分类器器里面的函数进行人脸的识别&#xff08;当然里面包含很多各种物体的分类器&#xff0c;大家可以一一测试&#xff09;&#xff0c;今天我们来练一下关于人脸识别的级联器。 1&#xff0c;ope…

数据仓库(DW)、数据湖、数据中台的关系

一句话说明&#xff1a;数据中台是一套体系&#xff0c;既不是工具又不是存储&#xff0c;它可以包含数据湖和数据仓库。 数据仓库 数据仓库是一个面向主题的、集成的、随时间变化但信息本身相对稳定的数据集合&#xff0c;用于支持管理决策过程。其本质就是完成从面向业务过程…

[附源码]Python计算机毕业设计Django-菜篮子系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

一文学会jenkins pipline自动化构建

01 Pipeline流水线基本语法 首先创建 在jenkins上创建一个pipeline的流水线任务 新建ITEM–>选择流水线 基本pipeline脚本结构 pipeline {//agent 表示要执行的节点&#xff0c;any表示任意节点 agent any //stages表示任务执行时的所有步骤集合 stages { /…

企业销售CRM的主要优势是什么?

民营企业商品销售CRM的主要就竞争优势是什么? 新一代研究说明&#xff0c;由于差劲的顾客新体验&#xff0c;或内公司每月经济损失750亿元。为了更快地介绍您的顾客&#xff0c;您须要两个智能化的顾客管理工作系统。因而&#xff0c;您能提供更多直接影响您的商品销售的高质…

Python调用C++

1 背景 python被称为胶水语言&#xff0c;其优势是能够粘结各种不同的语言。同时&#xff0c;python有着更大的“亲民性”&#xff0c;很容易进行开发。但是&#xff0c;python最大的问题就是计算速度不够。通常可以用CUDA或者C对一个python程序进行加速&#xff0c;加速策略如…

【LeetCode】单词搜索 II [H](前缀树)

212. 单词搜索 II - 力扣&#xff08;LeetCode&#xff09; 一、题目 给定一个 m x n 二维字符网格 board 和一个单词&#xff08;字符串&#xff09;列表 words&#xff0c; 返回所有二维网格上的单词 。 单词必须按照字母顺序&#xff0c;通过 相邻的单元格 内的字母构成&am…

QT系列第2节 QT中元对象系统

QT是在标准C上进行了扩展&#xff0c;所以就有自己的特性&#xff0c;其中元对象系统就是其一。元对象系统有点类似于java和go语言中的反射&#xff0c;让我们在编程时解决问题多了些方法和思路&#xff0c;关于元对象可以简单总结出以下内容项。 目录 一.元对象要点总结 二…

Linux转发性能评估与优化之——转发瓶颈分析与解决方案

线速问题 很多人对这个线速概念存在误解。认为所谓线速能力就是路由器/交换机就像一根网线一样。而这&#xff0c;是不可能的。应该考虑到的一个概念就是延迟。数据包进入路由器或者交换机&#xff0c;存在一个核心延迟操作&#xff0c;这就是选路&#xff0c;对于路由器而言&…

软件工程复习简略

软件工程复习简略1.什么是软件生存周期&#xff1f;通常可划分为哪些阶段&#xff1f;2.简述需求分析要经过哪些步骤&#xff0c;每个步骤的作用。3.详细设计有哪些常用工具&#xff1f;&#xff08;注意Pad图的画法&#xff09;4.软件测试的目的和原则是什么&#xff1f;5.测试…

pythonselenium自动化测试实战项目(完整、全面)

前言 之前的文章说过&#xff0c; 要写一篇自动化实战的文章&#xff0c; 这段时间比较忙再加回家过11一直没有更新博客&#xff0c;今天整理一下实战项目的代码共大家学习。&#xff08;注:项目是针对我们公司内部系统的测试&#xff0c;只能内部网络访问&#xff0c;外部网络…

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

【SpringCloud负载均衡】【源码图解】【二】LoadBalancer配置 目录3. LoadBalancer的工作原理3.1 创建LoadBalancerRequest3.2 创建上下文3.2.1 properties3.2.2 configurations3.2.3 contexts3.3 获取ReactiveLoadBalancer3.4 获取ServiceInstance3.5 向serviceInstance请求结…

Java要学到什么程度才可以找工作?

Java为不同的集合提供了一个集合框架。集合基于数据结构&#xff0c;比如常见的&#xff1a;列表、数组、集合、哈希图等等。因此&#xff0c;在研究集合时&#xff0c;最好了解一点数据结构的相关知识。 主要副题&#xff1a; List Set Map ArrayList LinkedList Queue…

web3:智能合约-虚拟机(EVM、HVM、WASM、MOVE)

在区块链上&#xff0c;用户通过运行部署在区块链上的合约&#xff0c;完成需要共识的操作。而为智能合约提供运行环境的便是对应的虚拟机。 目录EVM基础概念技术细节EVM的存储模型交易在EVM的执行普通转账交易智能合约的创建或者调用EVM机器语言与现有的虚拟机科技作比较EVM的…

Java中类的复用

类的复用&#xff08;组合与继承&#xff09; 第一种方法&#xff1a;只需在新类中产生现有类的对象&#xff0c;新类由现有类组成&#xff0c;也称为组合&#xff0c;该方法只是复用了现有程序代码的功能&#xff1b; 第二种方法&#xff1a;按现有类来创建新类&#xff0c;…

m基于LPF-VMD和KELM的鸟群优化算法的风速预测算法matlab仿真

目录 1.算法描述 2.仿真效果预览 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 1).使用 LPF-VMD 对风速时间序列进行分解&#xff0c; 得到一个低频的趋势分量以及 n 个由 VMD 分解得 到的 BIMF。 2).对 LPF-VMD 分解得到的各分量分别建立 KELM 预测模型&#xff0c;采用 B…

【进阶】C语言第二课:升级你的指针(1)

目录 &#x1f929;前言&#x1f929;&#xff1a; 一、字符指针&#x1f92f;&#xff1a; 1.字符指针的使用&#x1f99d;&#xff1a; 2.常量字符串&#x1f98a;&#xff1a; 3.相关面试题分析&#x1f423;&#xff1a; 二、指针数组&#x1f9d0;&#xff1a; 三、数…

vue+nodejs公益图书借阅捐赠管理系统

公益图书捐赠管理系统 用户信息&#xff1a;id、用户名、密码、捐书数量&#xff08;管理员端可以点击跳转查看详情&#xff09;、上传电子书数量&#xff08;管理员端可以点击跳转查看详情&#xff09;、借阅图书数量&#xff08;管理员端可以点击跳转查看详情&#xff09;&am…