反应式编程(一)什么是反应式编程

news2025/1/23 4:49:20

目录

    • 一、背景
    • 二、反应式编程简介
      • 2.1 定义
      • 2.2 反应式编程的优势
      • 2.3 命令式编程 & 反应式编程
    • 三、Reactor 入门
      • 3.1 Reactor 的核心类
      • 3.2 Reactor 中主要的方法
        • 1)创建型方法
        • 2)转化型方法
        • 3)其他类型方法
        • 4)举个例子
    • 四、Reactor 的工作原型
      • 4.1 申明阶段
        • 1)Flux.just()
        • 2)map()
        • 3)filter()
        • 4)申明总结
      • 4.2 订阅阶段
        • 1)subscribe()、onSubscribe()
        • 2)request()
        • 3)订阅总结
      • 4.3 Reactor 的工作原理总结
    • 五、补充
      • 5.1 Spring Gateway、WebFlux、Reactor、Netty的依赖关系

一、背景

反应式编程的概念在20世纪90年代开始形成,并在近年来随着云计算、物联网、大数据、移动互联网等技术的发展以及对 高性能、高并发、实时响应和事件驱动系统 需求的增强而逐渐流行起来。

背景和原因主要包括:

  1. 并发挑战: 传统的同步编程模型在处理高并发场景时可能会遇到 线程上下文切换带来的性能损失和资源浪费问题。反应式编程通过异步和非阻塞的方式来处理并发,提高了系统的吞吐量和响应能力。
  2. 数据流处理: 现代应用程序需要处理的数据量越来越大,而且经常涉及到数据流的实时处理和更新。反应式编程通过定义数据流和变换规则,让程序可以自动响应数据流的变化,简化了数据流处理的复杂度。
  3. 事件驱动架构: 在许多应用场景中,如用户交互、消息队列、传感器数据收集等,事件驱动成为主流。反应式编程特别适合处理这类事件驱动的场景,通过订阅和发布机制,程序可以根据事件的发生自动调整行为。
  4. 云原生和微服务: 云环境下的服务间通信常常是非同步的,微服务架构也需要高效的事件响应和错误处理机制。反应时变成理念帮助开发者构建弹性的、响应快速的分布式系统。
  5. 框架和技术支持: 随着 RxJava、Reactor、Akka Streams、Vert.x 等反应式编程库和框架的成熟,开发者有了更好的工具来实现反应式编程模型。

二、反应式编程简介

2.1 定义

反应式编程(Reactive Programming,Rx) 是一种基于 数据流(data stream)和 变化传递(propagation of change)的 声明式(declaractive)的编程范式。

反应式编程是一种编程思想、编程方式,是为了简化并发编程而出现的。与传统的处理方式相比,它能够基于数据流中的事件进行反应处理。例如:a+b=c 的场景:

  • 在传统编程方式下,如果 a、b 发生变化,那么我们需要重新计算 a+b 来得到 c 的新值。
  • 而在反应式编程中,我们不需要重新计算,a、b 的变化事件会触发 c 的值自动更新。这种方式类似于我们在消息中间件中常见的 发布/订阅模式。由流发布事件,而我们的代码逻辑作为订阅方基于事件进行处理,并且是异步处理的。

反应式编程中,最基本的处理单元是事件流中的事件(事件流是不可变的,对流进行操作只会返回新的流)。流中的事件包括:

  • 正常事件: 对象代表的数据、数据流结束标识。
  • 异常事件: 异常对象,例如:Exception。

同时,只有当订阅者订阅发布者后,发布者发布的事件流才会被消费,后续的订阅者只能从订阅点开始消费,但是我们可以通过 背压流控 等方式控制消费。

2.2 反应式编程的优势

反应式编程的核心是基于 事件流、无阻塞、异步的,使用反应式编程不需要编写底层的并发、并发代码。并且由于其声明式编写代码的方式,是的异步代码易读且易维护。反应式编程主要优点有:

  1. 整体采用了观察者模式,异步解耦,提高服务器的吞吐量。
  2. 内部提出了 背压(Backpressure)概念,可以控制消费的速度。
  3. 书写方式与迭代器,stream 类似,方便使用者理解。

2.3 命令式编程 & 反应式编程

命令式编程:

  • 我们普通的编程方法几乎都是命令式编程,按顺序执行一批代码,下一个任务的执行依赖于上一个任务的成功执行,最后等到所有代码执行完毕,我们才能拿到最终的输出结果,这就好像我订阅了一年的报纸,必须是等一年后才能拿到最近这一年的所有报纸。

反应式编程:

  • 类似于生活中真实的订阅报纸。虽然我订阅的是一年的报纸,但是我每天都能收到新的报纸。我们不需要等到所有代码都执行完毕才能取到数据,任务可以是并行处理的,我们可以得到中间的数据,每个任务处理一部分数据,最后进行汇总。

与迭代器对比:

事件IterableObservable
迭代next()onNext()
异常throws ExceptiononError()
完成!hasNext()onComplete()
  • Observable列表示观察者接收到的相关事件时触发的动作。

  • 如果迭代器看作是 “拉模式”,那观测者模式便是 “推模式”。

  • 被观察者(Subscriber)主动的推送数据给订阅者(Subscriber),触发 onNext() 方法。

    出现异常时触发 onError() 方法。

    完成后触发 onCompleted() 方法。

与stream对比:

事件streamObservable
映射map()map()
过滤filter()filter()
  • 与 stream 对比可以看出,Reactive Programming 也是通过类似的数据流方式来处理订阅的数据。
  • 不同点在于 stream 无法控制消息发送速度,而反应式编程中如果 Publisher 发布消息太快,超过了 Subscriber 的处理速度,反应式编程提供了 背压机制 来控制 Publisher 的速度。

三、Reactor 入门

3.1 Reactor 的核心类

  • Mono<T>: 实现了 org.reactivestreams.Publisher 接口,代表 0~1 个元素的发布者。

  • Flux<T>: 同样实现了 org.reactivestreams.Publisher 接口,代表 0~N 个元素的发布者。

    Mono<T> 和 Flux<T> 可以相互转换。多个 Mono 可以合并成一个 Flux<T>,一个 Flux<T> 也可以转化成 Mono<T>。

  • Subscriber: 观察者,用来观察 Publisher 相关动作。

  • Subscription: 解耦 Subscriber 和 Publisher。

3.2 Reactor 中主要的方法

1)创建型方法
  • just(): 根据参数创建数据流。
  • never(): 创建一个不会发出任何数据的无限运行的数据流。
  • empty(): 创建一个不包含任何数据的数据流,不会无限运行。
  • error(): 创建一个订阅后立即返回异常的数据流。
  • concat(): 从多个 Mono<T> 创建 Flux<T>。
  • generate(): 同步、逐一的创建复杂流。重载方法支持生成状态。在方法内部的 Lambda 中通过调用 next() 和 complete()、error() 来指定当前循环返回的流中的元素(并不是return)。
  • create(): 支持同步、异步、批量的生成流中的元素。
  • zip(): 将多个流合并为一个流,流中的元素一一对应。
  • delay(): Mono<T>方法,用于指定流中的第一个元素产生的延迟时间。
  • interval(): Flux<T>方法,用于指定流中各个元素产生时间的间隔(包括第一个元素产生时间的延迟),从 0 开始的 Long 对象组成的流。
  • justOrEmpty(): Mono<T>方法,用于指定当初始化时的值为null时,返回空的流。
  • defaultEmpty(): Mono<T>方法,用于指定当流中元素为空时产生的默认值。
  • range(): 生成一个范围的 Integer 队列。
2)转化型方法
  • map(): 将流中的数据按照逻辑逐个映射为一个新的数据,当流是通过 zip() 创建时,有一个元组入参,元组内元素代表 zip() 前的各个流中的元素。
  • flatMap(): 将流中的数据按照逻辑逐个映射一个新的六,新的流之间是异步的。
  • take(): 从流中获取 N 个元素,有多个扩展方法。
  • zipMap(): 将当前流和另一个流合并为一个流,两个流中的的元素一一对应。
  • mergeWith(): 将当前流和另一个流合并为一个流,两个流中的元素按照生成顺序合成,无对应关系。
  • join(): 将当前流和另一个流合并为一个流,流中的元素不是一一对应的关系,而是根据产生时间进行合并。
  • concatWith(): 将当前流和另一个流按声明顺序(不是元素的生成时间)链接在一起,保证第一个流消费完再消费第二流。
  • zipWith(): 将当前流和另一个流合并为一个新的流,这个流可以通过 Lambda 表达式设定合并逻辑,并且流中元素一一对应。
  • first(): 对于 Mono<T> 返回多个流中,第一个产生元素的 Mono<T>。对于 Flux<T>,返回多个 Flux<T> 流中第一个产生元素的 Flux<T>。
  • block(): Mono<T> 和 Flux<T> 中类似的方法,用于阻塞当前线程直到流中生成元素。
  • tolterable(): Flux<T> 方法,将 Flux<T> 生成的元素返回一个迭代器。
  • defer(): Flux<T> 方法,用于从一个 Lambda 表达式获取结果来生成 Flux<T> ,这个 Lambda 一般是线程阻塞的 buffer 相关方法,用于将流中的元素按照时间、逻辑规则分组为多个元素集合,并且这些元素集合组成一个元素类型为集合新流。
  • window(): 与 buffer() 类似,但是 window() 返回的流中元素类型还是流,而不是 buffer 的集合。
  • filter(): 顾名思义,返回负责规则的元素组成的新流。
  • reduce(): 用于将流中的各个元素与初始值(可以设置)逐一累积,最终得到一个 Mono<T>。
3)其他类型方法
  • duOnXXXX(): 当流发生 XXX 时的回调方法,可以有多个,类似于监听。XXX 包括 Subscribe、Next、Complete、Error 等。
  • onErrorResume(): 设置流发生异常时返回的发布者,此方法的 Lambda 时异常对象。
  • onErrorReturn(): 设置流发生异常时返回的元素,无法捕获异常。
  • then(): 返回 Mono<T>,跳过整个流的消费。
  • ignoreElements(): 忽略整个流中的元素。
  • subscribeOn(): 配合 Scheduler 使用,订阅时的线程模型。
  • publisherOn(): 配合 Scheduler 使用,发布时的线程模型。
  • retry(): 订阅者重试次数。
4)举个例子

场景: 假设有个名单列表,要根据 名单 获取对应名字的邮箱,并且过滤掉邮箱长度小于 10 的邮箱,最后再将符合条件的邮箱打印出来。

使用 stream 编程如下所示:

Stream.of("Tom", "Bob", "zhangsan", "lisi")
    .map(s -> s.concat("@qq.com"))
    .filter(s -> s.length() > 10)
    .forEach(System.out::println);

使用 Reactive 编程如下所示:

Flux.just("Tom", "Bob", "zhangsan", "lisi")
    .map(s -> s.concat("@hq.com"))
    .filter(s -> s.length() > 10)
    .subscribe(System.out::println);

通过上述例子可以看出,stream 和 Reactive 在形式上有相似之处,都是先创建数据源,然后经过中间过程处理转换,最后再消费中间处理结果。接下来我们逐行进行下解析:

  • Flux.just(“Tome”, “Bob”, “zhangsan”, “lisi”)

    Flux.just() 创建一个 Flux 的发布者。除了使用 just() 方法外,还有 fromCallable()fromIterable() 等其他方式用来从不同场景中创建 publisher。

  • map(s -> s.concat(“@qq.com”))

    map 的含义就是映射,在该步骤中将每个序列元素进行转换,在每个名称后面加上邮箱后缀。

  • filter(s -> s.length() > 10)

    过滤步骤,剔除掉长度不大于 10 的元素。

  • subscribe(System.out::println)

    该步骤是最终的订阅节点,之前创建的都是被观察者,该步骤是创建一个观察者 Subscriber。其中 Subscriber 的具体行为就是 System.out::println 打印出之前处理过的元素。

至此,一个订阅发布的过程就结束了。


四、Reactor 的工作原型

其实 反应式编程的核心就是一个观察者模式

Flux<T> 和 Mono<T> 相当于观察者模式中的 subject,当 Flux<T> 或 Mono<T> 调用 subscribe() 方法时,相当于 subject 发出了一个 Event,从而让订阅次事件的观察者进行消费。

那 Flux 框架具体是如何实现这套机制呢,还是上文中的例子,我们下面跟踪下它是如何工作的。

Flux.just("Tom", "Bob", "zhangsan", "lisi")
    .map(s -> s.concat("@qq.com"))
    .filter(s -> s.length() > 10)
    .subscribe(System.out::println);

reactor-core版本: 3.1.9.RELEASE

4.1 申明阶段

1)Flux.just()

进入 just() 方法,经过若干跳转后,进入如下方法:

public static <T> Flux<T> fromArray(T[] array) {
    if (array.length == 0) {
        return empty();
    }
    if (array.length == 1) {
        return just(array[0]);
    }
    return onAssembly(new FluxArray<>(array));
}

onAssembly 是一个钩子方法,暂时忽略。最终就是 new FluxArray<>(array) 一个对象创建出一个 FluxArray。点击 FluxArray 的构造函数中,可以看到,只是把 array 赋值给了对象内部的 array。

final T[] array;

@SafeVarargs
pubilc FluxArray(T... array) {
    this.array = Objects.requireNonNull(array, "array");
}
2)map()

Flux.just() 方法只是创建了一个 FluxArray 对象,回到最开始定义的地方,下一步执行的是 map 方法。定义如下所示:

public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
    if (this instanceof Fuseable) {
        return onAssembly(new FluxMapFuseable<>(this, mapper));
    }
    return onAssembly(new FluxMap<>(this, mapper));
}

上一步创建的 FluxArray 是一个 Fuseable,所以执行 if 条件里的逻辑,创建一个 FluxMapFuseable 对象,FluxMapFuseable 的构建函数中有两个参数,this 和 mapper。

  • this 就是上一步创建出来的 FluxArray;
  • mapper 就是我们自定义的 Lambda 表达式,即:s -> s.concat(“@qq.com”)。

再点击进入 FluxMapFuseable 的构造函数中。

FluxMapFuseable(Flux<? extends source, Function<? super T, ? extends R> mapper) {
    super(source);
    this.mapper = Objects.requireNonNull(mapper, "mapper");
}

从这个构造函数可以看出,source 是上一步骤 just() 得到的 FluxArray,mapper 是对应 map 的 Lambda 表达式,所以当执行 map 操作的时候,其实是又将 FluxArray 进行封装,得到了一个新的 FluxMapFuseable 对象

3)filter()

再次回到开始的申明地方,在执行完 map() 操作后,接着执行 filter() 方法。同理,点击 filter() 方法,可以看到如下代码。

public final Flux<T> filter(Predicate<? super T> p) {
    if (this instanceof Fuseable) {
        return onAssembly(new FluxFilterFuseable<>(this, p));
    }
    return onAssembly(new FluxFilter<>(this, p));
}

在看过 map 的操作后,这一步骤其实就相当熟悉了,filter 步骤将上一步 map 操作得到的 FluxMapFuseable 方法又一次封装成了 FluxFilterFuseable 对象

4)申明总结

从上面的定义看看出,申明阶段就是一层一层的创建各种 Flux 对象,并没有实际执行任何操作。通过 just()、map()、filter() 等操作,将发布者一层一层的封装,从最开始的 FluxArray 对象,到 FluxMapFuseable 对象以及最后的 FluxFilterFuseable 对象。如下图所示:

在这里插入图片描述

4.2 订阅阶段

注意: Subscriber 类实际上对应的是 Reactive Stream 规范中的订阅者接口,它负责订阅并消费发布者发布的数据流。Subscriber 一词可以翻译成 “订阅者”,也可以翻译成 “观察者”,为方便理解,这里统一使用 “订阅者”。

1)subscribe()、onSubscribe()

上述例子中,just()、map()、filter() 只是创建了一个个的对象。并没有实际执行相关逻辑。当调用被观察者的 subscribe() 方法时,会为被观察者添加相应的观察者,同时触发观察者相关方法,从而整个观察者模式得以进行下去。接着看下 Flux 的 subscribe() 方法。经过一系列的 jump 后,最终会调用 Flux 的 subscribe() 方法。如下所示:

public abstract void subscribe(CoreSubscriber<? super T> actual);

该方法是一个抽象方法,需要看下子类是如何实现的。还记得上面申明阶段中 filter() 后产生的对象吗?FluxFilterFuseable 是 Flux 的一个具体实现,当调用 subscribe 后,会跳转到 FluxFilterFuseable 的 subscribe() 方法,代码如下:

/**
 * FluxFilterFuseable 的 subscribe()
 */
public viod subscribe(CoreSubscriber<? super T> actual) {
    // actual 是 System.out::println,封装成了 LambdaSubscriber 订阅者
    if (actual instanceof ConditionalSubscriber) {
        source.subscribe(new FilterFuseableConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate)); // 第1处
        return;
    }
    // 走到这里,将 System.out::println 和 s->s.length>10 封装为一个新的订阅者。
    source.subscribe(new FilterFuseableSubscriber<>(actual, predicate)); // 第2处
}
  • 传进来的 actualSystem.out::println,也就是我们最终执行的表达式,它被封装成了一个 LambdaSubscriber订阅者

  • predicate 为 filter 指定的表达式 s -> s.length() > 10。

  • source 为上一步骤中生成的 FluxMapFuseable 对象。

    根据对象情况,代码会走到第2处,第2处的逻辑就是将 actual 和 predicate 封装成一个 订阅者 去订阅 source 也就是 FluxMapFuseable 对象。

接着代码会去调用 source 的 subscribe() 方法,也就是 FluxMapFuseable 对应的 subscribe() 方法:

/**
 * FluxMapFuseable 的 subscribe()
 */
public void subscribe(CoreSubscriber<? super R> actual) {
    // actual 是上一步新封装的 FilterFuseableSubscriber 订阅者
    if (actual instanceof ConditionalSubscriber) { // 第1处
        ConditionalSubscriber<? super R> cs = (ConditionSubscriber<? super R>) actual;
        source.subscribe(new MapFuseableConditionalSubscriber<>(cs, mapper));
        return;
    }
    // 走到这里,将 FilterFuseableSubscriber 和 s->s.concat("@qq.cmcc") 封装成新的订阅者
    source.subscribe(new MapFuseableSubscriber<>(actual, mapper)); // 第2处
}

代码还是会走到第2处,

  • 这里传入的 actual 是上一步骤中封装了 System.out::prinlns -> s.length() > 10 的订阅者,

  • mappers -> s.concat(“@qq.com”)

    从这段代码可以看出,所做的逻辑就是将上一步中的观察者和 mapper 又封装成了新的订阅者,一层一层的套娃。

最后,看下本步骤中的 source,也就是 FluxArray 对象的 subscribe() 方法:

/**
 * FluxArray 的 subscribe()
 */
public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
    if (array.length == 0) {
        Operators.complete(s);
        return;
    }
    // s 是上一步新封装的 MapFuseableSubscriber 订阅者
    if (s instanceof ConditionalSubscriber) { // 第1处
        s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
    } else {
        // 走到这里,将 MapFuseableSubscriber 和 "Tom","Bob","zhangsan","lisi" 封装成订阅信息
        // 并且触发 “观察者模式”
        s.onSubscribe(new ArraySubscription<>(s, array)); // 第2处
    }
}

FluxArray 是数据的源头,

  • 传入的 array 为我们定义的 “Tom”,“Bob”,“zhangsan”,“lisi” 名字,

  • s 为上一步骤中创建的 subscriber订阅者。在数据的源头可以看出作为观察者模式的触发点,

    该步骤中触发了订阅者的 onSubscribe() 方法。同时为了解耦观察者和被观察者,创建一个 ArraySubscription 对象。FluxArray 的 subscribe() 会执行第2处代码,s.onSubscribe(new ArraySubscription<>(s, array)),这里的 s 是上一步骤中创建的 MapFuseableSubscriber 中的 onSubscribe() 方法,对应代码如下所示:

/**
 * MapFuseableSubscriber 的 onSubscribe()
 */
@Override
public void onSubscribe(Subscription s) {
    // s 是封装的 ArraySubscription 订阅信息
    if (Operators.validate(this.s, s)) {
        this.s = (QueueSubscription<T>) s;
        // actual 是 FilterFuseableSubscriber 对象
        actual.onSubscribe(this);
    }
}
  • actual 是 FilterFuseableSubscriber 对象,

    本质就是赋值后,然后调用 FilterFuseableSubscriber 的 onSubscribe() 方法。

FilterFuseableSubscriber 对应的 onSubscribe() 方法如下所示:

/**
 * FilterFuseableSubscriber 的 onSubscribe()
 */
@Override
public void onSubscribe(Subscription s) {
    // s 是封装的 ArraySubscription 订阅信息
    if (Operators.validate(this.s, s)) {
        this.s = (QueueSubscription<T>) s;
        // actual 是 LambdaSubscriber 对象
        actual.onSubscribe(this);
    }
}
  • 和 MapFuseableSubscriber 类似,actual 对应的是 LambdaSubscriber,也就是 System.out::println。

LambdaSubscriber 的 onSubscribe() 如下所示:

/**
 * LambdaSubscriber 的 onSubscribe()
 */
public final void onSubscribe(Subscription s) {
    // s 是封装的 ArraySubscription 订阅信息
    if (Operators.validate(subscription, s)) {
        this.subscription = s;
        if (subscriptionConsumer != null) {
            try {
                subscriptionConsumer.accept(s); // 第1处
            } catch (Throwable t) {
                Exceptions.throwIfFatal(t);
                s.cancel();
                onError(t);
            }
        } else {
            // 走到这里
            s.request(Long.MAX_VALUE); // 第2处
        }
    }
}

第1处和第2处代码的最终逻辑都一样,都会执行 request 方法。背压的原理就是通过这个 request 来实现的,观察者可以通过 request 来指定一次性订阅多少数据

总结一下:

  • 一个 subscribe() 方法其实是创建了三个订阅者,与创建发布者类似,创建的订阅者也是一层一层嵌套。从最外层的 Subscriber 与上一层的操作结合生成一个新的 Subscriber。再继续向上调用,最终调用到数据源头。然后从数据源头开始一层一层,最后再触发订阅者的 onSubscribe() 方法
2)request()

由上文可知,在 onSubscribe() 方法调用阶段最终会调用 s 的 request() 方法。s 就是封装的 ArraySubscription 订阅信息,用于解耦订阅者和被订阅者。

ArraySubscription 的 request() 如下所示:

/**
 * ArraySubscription 的 request()
 */
@Override
public void request(long n) {
    if (Operations.validate(n)) {
        if (Operators.addCap(REQUESTED, this, n) == 0) {
            if (n == Long.MAX_VALUE) {
                // 走到这里
                fastPath(); // 第1处
            } else {
                slowPath(); // 第2处
            }
        }
    }
}

void fastPath() {
    final T[] a = array;
    final int len = a.length;
    // s 是之前封装的 MapFuseableSubscriber 订阅者
    final Subscriber<? super T> s = actual;
    
    for (int i = index; i != len; i++) {
        if (cancelled) {
            return;
        }
        
        T t = a[i];
        
        if (t == null) {
            s.onError(new NullPointerException("The " + i + "th array element was null"));
            return;
        }
        
        s.onNext(t);
    }
    if (cancelled) {
        return;
    }
    s.onComplete();
}

直接看下 fastPath() 方法,代码都贴在上面了。到这里就真正开始消费了。通过一个 for 循环,调用 Subscriber 订阅者的 onNext() 方法,onNext() 方法执行完毕后,执行 Subscriber 的 onComplete() 方法。

  • 这里的 s 是 MapFuseableConditionalSubscriber,看下它的 onNext() 方法:
public void onNext(T t) {
    if (sourceMode == ASYNC) {
        actual.onNext(null);
    } else {
        if (done) {
            Operators.onNextDropped(t, actual.currentContext());
            return;
        }
        R v;
        
        try {
            v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null value."); // 第1处
        } catch (Throwable e) {
            onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
            return;
        }
        
        actual.onNext(v); // 第2处
    }
}
  • 在第1处执行 mapper 对应的 Lambda 表达式,
  • 在第2处执行下一步的 Subscriber 的 onNext() 方法。

下一步是 Filter,再下一步是最终的 System.out::println。最后 onNext() 都执行完成后,执行 s 的 onComplete() 方法,道理也是一样的,都是从最开始 Subscriber 的 onComplete() 方法一层一层执行。至此一个完整的观察者模式的执行情况就完成了。

3)订阅总结

订阅的整体流程图如下所示:

在这里插入图片描述

4.3 Reactor 的工作原理总结

  1. 申明阶段: 支持创建了一个个的被订阅者,把动作包装成对象,其他什么事都没做,直到调用被观察者的 subscribe() 方法,为被订阅者添加订阅者。
  2. 订阅阶段——subscribe()、onSubscribe(): 添加订阅者后,每一个申明步骤都会创建一个新的订阅者,订阅上个步骤的被订阅者,直到最外层被订阅者触发 onSubscribe() 方法,接着按照刚才添加的订阅者一层层调用对应的 onSubscribe() 方法,最后触发 request 方法。
  3. 订阅阶段——request(): 当触发到最外层的 request() 后,就执行真正的逻辑,再一层层调用订阅者的 onNext() 方法。最后完成后调用 onComplete() 方法。

五、补充

5.1 Spring Gateway、WebFlux、Reactor、Netty的依赖关系

springboot-webflux是一个引入了 spring-boot-starter-webflux 依赖的 Demo 项目,其中各个组件的依赖关系如下所示:

  1. Spring Cloud Gateway:
    • Spring Cloud Gateway 是一个基于 Spring 框架构建的 API 网关,用于微服务架构中的服务路由、过滤和安全控制等功能。
    • 为了实现高性能和异步非阻塞的特性,它选择基于 WebFlux 框架来构建。
  2. WebFlux:
    • Spring WebFlux 是 Spring 框架提供的一个完全非阻塞的、反应式编程模型的 Web 框架,适用于构建异步和事件驱动的 Web 应用程序。
    • WebFlux 可以运行在多种底层服务器之上,其中一个选项就是 Reactor Netty。
  3. Reactor:
    • Reactor 是一个反应式编程库,它是 Reactive Streams 规范的一个实现,提供了 Publishers、Subscribes、Processors 等组件,便于构建异步和数据流驱动的应用程序。
    • 在 Spring WebFlux 中,Reactor 用于构架和处理事件驱动的数据流,如 HTTP 请求和响应。
  4. Reactor Netty:
    • Reactor Netty 是结合了 Netty 和 Reactor 的项目,它将 Netty 的高性能网络能力与 Reactor 的反应式编程模型相结合,形成了一个高度优化的异步 HTTP 服务器和客户端实现。
    • Spring WebFlux 使用 Reactor Netty 作为其底层网络通信层,从而使得 WebFlux 能够在 Netty 的基础上高效地处理 HTTP 请求和响应。

整理完毕,完结撒花~🌻





参考地址:

1.什么是反应式编程(超详细说明),反应式编程和命令式编程的区别。如何使用Spring中的Reactor。Reactor中常用的操作。Mono和Flux。https://blog.csdn.net/qq_42799615/article/details/111235576

2.反应式编程入门及原理,https://juejin.cn/post/7034350525197860878

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1556505.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kafka学习笔记02(小滴课堂)

Kafka命令行生产者发送消息和消费者消费消息实战 已存在的kafka不能重复创建。 broker设置的是1&#xff0c;factor大于broker了&#xff0c;所以报错。 生产者发送消息&#xff1a; kafka列表出现了新的kafka。 我们使用这个kafka。 我们启动消费者&#xff1a; 我们现在不从…

机器学习--支持向量机(通俗版本+demo)

场景 假设我们要在一个在线零售平台上自动区分商品评论是正面的还是负面的。评论中的语言多种多样&#xff0c;且往往含有大量的非结构化文本数据&#xff0c;直接使用简单的规则来分类是非常困难。这时候我们采取支持向量机算法来分类是一个比较好的选择。 支持向量机 支持…

Find Any File (FAF) for Mac:您的专属文件搜索神器

在数字时代&#xff0c;我们的Mac硬盘中堆积着各式各样的文件&#xff0c;从工作文档到家庭照片&#xff0c;从音乐视频到学习资料&#xff0c;无一不体现出我们的生活和工作的丰富多彩。然而&#xff0c;当我们需要快速找到某个特定文件时&#xff0c;却常常在茫茫文件海中迷失…

PostgreSQL到Doris的迁移技巧:实时数据同步新选择!

PostgreSQL可以说是目前比较抢手的关系型数据库了&#xff0c;除了兼具多样功能和强大性能之外&#xff0c;还具备非常优秀的可扩展性&#xff0c;更重要的是它还开源&#xff0c;能火不是没有理由的。 虽然PostgreSQL很强大&#xff0c;但是它也有短板&#xff0c;相对于专业…

DHT22温湿度传感器驱动代码(C8T6)

最近做了一个小项目&#xff0c;总结了一下温湿度传感器dht22的驱动代码&#xff0c;方便大家使用。实测可用&#xff0c;注意修改自己的引脚还有初始化gpio即可。 DHT22.c #include "DHT22.h" #include "gpio.h" #include "usart.h" #include &…

数据处理库Pandas数据结构DataFrame

Dataframe是一种二维数据结构&#xff0c;数据以表格形式&#xff08;与Excel类似&#xff09;存储&#xff0c;有对应的行和列&#xff0c;如图3-3所示。它的每列可以是不同的值类型&#xff08;不像 ndarray 只能有一个 dtype&#xff09;。基本上可以把 DataFrame 看成是共享…

STM32 直接修改寄存器来输出内部时钟的方法

1. 在特殊情况下使能 MCO 功能的方法 在对某些不容易复现的问题进行代码调时&#xff0c;需要观察内部时钟的情况&#xff0c;但往往代码之前并没有使能 MCO 功能&#xff0c;在这种情况下就可以使用寄存器直接配置来输出内部时钟到GPIO 脚位上进行观察和测试。 下面的例子就…

泰迪智能科技荣获山东省“技能兴鲁”职业技能大赛优秀组织奖

近日&#xff0c;泰迪智能科技荣获了山东省“技能兴鲁”职业技能大赛——第四届山东省“云数”技能竞赛“优秀组织单位”。 据悉&#xff0c;山东省“技能兴鲁”职业技能大赛——第四届山东省“云数”技能竞赛是由山东电子学会、山东省信息产业协会主办的&#xff0c;该竞赛是通…

产品之美10| 小小提示词(hint),便于用户交互

最近AIGC功能火热&#xff0c;有文生图和图生图两种。当用户初次接触到文生图的时候&#xff0c;会有一刻停顿&#xff1a;我该怎用输入呢&#xff1f;这时候的hint就可以发挥作用了&#xff1a; 编辑框&#xff08;EditView)里面有可爱的小女孩&#xff0c;加风格卡通。用户看…

面试题1(京东)之HiveSql --- 难度:入门初级

第1题 有如下的用户访问数据 userIdvisitDatevisitCountu012017/1/215u022017/1/236u032017/1/228u042017/1/203u012017/1/236u012017/2/218u022017/1/236u012017/2/224 要求使用SQL统计出每个用户的累积访问次数&#xff0c;如下表所示&#xff1a; 用户id月份小计累积u01…

Linux+ARM 简单环境检测---软件部分

1、前言 这个是我学习linuxARM的在做的第一个软硬件结合项目&#xff0c;以往的类似这种整体类项目还是光单片机的时候&#xff0c;linux软件部分学习了差不多快一年了&#xff0c;因为各种事情耽搁&#xff0c;这个项目一直没有静下心来完成&#xff0c;不过终于哈哈哈哈搞完了…

摸鱼的时候可以干什么?

可以做什么&#xff1f;有了VScode&#xff0c;你想做啥都可以&#xff01; 1、刷知乎 上班总是忍不住刷知乎&#xff0c;但是又怕领导发现&#xff0c;这个插件可以帮助你悄无声息地愉快地浏览知乎&#xff0c;有多少人是用这款插件看到的这篇文章&#xff1f; 你甚至可以用 M…

Java反序列化JDK动态代理的关系

Java代理模式 为什么要学习代理模式&#xff1f;了解开发原理&#xff0c;才能明白漏洞的产生。这不仅仅是SpringAOP的底层&#xff01; [SpringAOP 和 SpringMVC] 代理模式的分类&#xff1a; 静态代理动态代理 静态代理 角色分析&#xff1a; 抽象角色&#xff1a;一般会…

【vue】一个小bug和key的引入

点击master Vue!删除后该list后输入框中的Jerry消失了 原因&#xff1a;vue当你更改元素时会在真实的dom中渲染并更新list。这两个goal是两个dom元素,触发点击事件后,vue并不会删除第一个dom元素,而是把第二个dom元素的动态内容({{ goal }} - {{ index }})复制到第一个dom元素…

Logback日志框架(超详细)

logback-classic-1.2.3.jarhttp://链接: https://pan.baidu.com/s/1cA3gVB_6DEA-cSFJN6MDGw 提取码: sn8i 复制这段内容后打开百度网盘手机App&#xff0c;操作更方便哦 logback-core-1.2.3.jarhttp://链接: https://pan.baidu.com/s/19eCsvsO72a9PTqpXvXxrgg 提取码: 5yp…

JHY-31复合电压继电器 额定电压Un=110VDC 板后接线 JOSEF约瑟

用途&#xff1a; JHY-31复合电压继电器使用于电力系统的继电保护线路中&#xff0c;作为各种类型故障的判别元件和电压闭锁元件。 继电器型号名称&#xff1a; 例:辅助直流工作电压为110V的复合电压继电器的订货代号为: JHY-31/110V。 工作原理&#xff1a; 继电器内部具有负…

云架构(二) 大使模式

Ambassador pattern &#xff08;https://learn.microsoft.com/en-us/azure/architecture/patterns/ambassador&#xff09; 简单描述 创建一个助手服务&#xff0c;这个服务代表消费服务或者应用程序发送网络请求。大使服务可以看做是与客户机同一个位置的进程外代理。 这种…

【JDK常用的API】包装类

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏 …

微服务(基础篇-007-RabbitMQ)

目录 初识MQ(1) 同步通讯&#xff08;1.1&#xff09; 异步通讯&#xff08;1.2&#xff09; MQ常见框架&#xff08;1.3&#xff09; RabbitMQ快速入门(2) RabbitMQ概述和安装&#xff08;2.1&#xff09; 常见消息模型&#xff08;2.2&#xff09; 快速入门&#xff…

精品丨PowerBI负载测试和容量规划

当选择Power BI作为业务报表平台时&#xff0c;如何判断许可证的选择是否符合业务需求&#xff0c;价格占了主导因素。 Power BI的定价是基于SKU和服务器内核决定的&#xff0c;但是很多IT的负责人都不确定自己公司业务具体需要多少。 不幸的是&#xff0c;Power BI的容量和预期…