【JavaWeb】Spring非阻塞通信 - Spring Reactive之WebFlux的使用

news2024/11/14 19:09:26

【JavaWeb】Spring非阻塞通信 - Spring Reactive之WebFlux的使用

文章目录

  • 【JavaWeb】Spring非阻塞通信 - Spring Reactive之WebFlux的使用
    • 参考资料
    • 一、初识WebFlux
      • 1、什么是函数式编程
        • 1)面向对象编程思维 VS 函数式编程思维(封装、继承和多态描述事物间的联系 VS 对运算过程(函数即变量间的映射关系)进行抽象)
        • 2)Java中的函数式编程(流式编程)
      • 2、对ChatGPT进行关于WebFlux的提问
        • 1)基本概念理解问题
        • 2)代码理解问题
    • 二、WebFlux基本概念
      • 1、WebFlux核心理念(Reactive 响应式宣言:快速响应,弹性伸缩,消息驱动)
      • 2、WebFlux特点与MVC的比较(异步非阻塞,去Servlet,函数式编程)
      • 3、WebFlux模块的整体架构(响应式模式 = 观察者模式,上下游数据传递:`Subscriber`调用`Publisher` 的 `subscribe()` 订阅数据;`Publisher` 调用 `Subscriber` 的 `onSubscribe()`传递数据(订阅媒介`Subscription`);`Subscriber`可以通过`Subscription#request`请求数据,通过`Subscription#cancel`取消数据发布等)
      • 4、WebFlux中关于Flux和Mono的介绍(`Flux`和 `Mono` 都是数据流的发布者,可以发出元素值,错误信号,完成信号)
    • 二、WebFlux的基本使用
      • 1、WebFlux的使用 - Flux / Mono / 操作符
        • 1)WebFlux如何接入到Springboot项目
        • 2)快速上手Reactive
        • 3)Flux的使用(常用方法:`just()`,`fromArray(),fromIterable()`和 `fromStream()`,`empty()`,`error()`,`never()`,`range(int start, int count)`,`interval(Duration period)` 和 `interval(Duration delay, Duration period)`)
        • 4)Mono的使用(常用方法:`fromCallable()`、`fromCompletionStage()`、`delay(Duration duration)`、`ignoreElements(Publisher source)`、`justOrEmpty(Optional<? extends T> data)`)
        • 5、操作符的使用(`buffer`,`bufferTimeout`,`bufferUntil`,`bufferWhile`,`filter`,`window`,`zipWith`,`BiFunction`,`take`,`reduce`,`reduceWith`,`merge`,`mergeSequential`,`flatMap`,`flatMapSequential`,`concatMap`,`combineLatest`)
        • 6)其他说明
      • 2、WebFlux响应式编程说明(容易踩的坑、以及编程规范)(代码待实践验证)
        • 1)基本概念
        • 2)选择合适的操作符(响应式/非响应式方法,比如`flatMap`是响应式方法,`doOnNext`不是响应式方法,因为返回值类型是`Consumer<T>`)
        • 3)null处理
        • 4)非阻塞与阻塞
        • 5)上下文
        • 6)常见问题
          • a)问题一:我写的操作看上去是正确的,但是没有执行
          • b)问题二:**我想获取流中的元素怎么办 - 不要试图取值**
          • c)问题三:我需要在非响应式方法中使用响应式怎么办?

参考资料

官方文档

  • Spring Reactive官方文档
  • Reactor 3 Reference Guide(关于为什么会有这个WebFlux框架有很详细的介绍)
  • Spring Reactor-core 3.6.4官方文档
  • Reactive Streams 官方介绍
  • Spring Reactive Stream API
  • Spring WebFlux 官方文档

博客

  • WebFlux响应式框架快速入门
  • Flux中的map、flatMap、concatMap的区别
  • WebFlux 响应式编程介绍及简单实现
  • 响应式编程入门之 Project Reactor
  • Project Reactor 响应式编程
  • 响应式编程说明

视频

  • 【Spring Webflux】深入Spring5新特性Webflux模块详解

一、初识WebFlux

1、什么是函数式编程

1)面向对象编程思维 VS 函数式编程思维(封装、继承和多态描述事物间的联系 VS 对运算过程(函数即变量间的映射关系)进行抽象)

参考 https://blog.csdn.net/weixin_38255079/article/details/122437312

函数式编程(Functional Programming, FP),FP 是编程范式之一,我们常听说的编程范式还有面向过程编程、面向对象编程。

  • 面向对象编程的思维方式:把现实世界中的事物抽象成程序世界中的类和对象,通过封装、继承和多态来演示事物事件的联系
  • 函数式编程的思维方式:把现实世界的事物和事物之间的联系抽象到程序世界(对运算过程进行抽象
    • 程序的本质:根据输入通过某种运算获得相应的输出,程序开发过程中会涉及很多有输入和输出的函数
      x -> f(联系、映射) -> yy=f(x)
    • 函数式编程中的函数指的不是程序中的函数(方法),而是数学中的函数即映射关系,例如:y = sin(x),x和y的关系
    • 相同的输入始终要得到相同的输出(纯函数)
    • 函数式编程用来描述数据(函数)之间的映射
2)Java中的函数式编程(流式编程)
  • 常见的函数式编程接口:SupplierConsumerFunctionUnaryOperatorBiFunction

    public class LambdaDemo2 {
    
        public static void main(String[] args) {
            /**
                * 常见的函数式编程接口
                */
    
            /**
                * Supplier: 没有输入 只有一个输出
                */
            Supplier<String> supplier = () -> {return "Supplier:" + "Hello world";};
            System.out.println(supplier.get());
    
            /**
                * Consumer: 只有一个输入 没有输出
                */
            Consumer<String> consumer = (e) -> {
                System.out.println("Consumer:" + e);
            };
            consumer.accept("Hello world");
    
            /**
                * Function: 输入T 输出R
                */
            Function<String,String> function = (e) -> {
                return "Function:" + e;
            };
            System.out.println(function.apply("Hello world"));
    
            /**
                * UnaryOperator: 输出输入都是T
                */
            UnaryOperator<String> unaryOperator = (e) -> {
                return "UnaryOperator:" + e;
            };
            System.out.println(unaryOperator.apply("Hello world"));
    
            /**
                * BiFunction: 输入T,U 输出R
                */
            BiFunction<Integer,Integer,String> BiFunction = (x,y) -> {
                return "BiFunction:" + (x + y);
            };
            System.out.println(BiFunction.apply(1,2));
        }
    }
    
    
  • 流式编程的中间操作(可以执行多次)和终止操作(只能执行1次)

    在这里插入图片描述

  • 函数式编程小例子:

    public class LambdaDemo1 {
    
        public static void main(String[] args) {
    
            /**
                * 通过流式编程,从如下数组中提取得到 belo
                * 流程如下:bo_le -> bole -> b,o,l,e -> belo
                */
            String arr[] = new String[]{"bo_le", "bo_le", "ba*ll", "ba(e", "ba*e",};
    
            Optional<String> first = Stream.of(arr)
                    .filter(e -> {
                        return e.equals("bo_le");
                    })
                    .distinct()
                    .map(e -> e.replaceAll("_", ""))
    //                .map(e -> Stream.of(e.split("")))
                    .flatMap(e -> Stream.of(e.split("")))
                    .sorted()
                    .findFirst();
            System.out.println(first.get());
    
            /**
                * Note:
                * map(Function f)
                *     接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素,返回的数据还是一个流。
                * flatMap(Function f)
                *     接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。
                *
                * 比如上面的flatMap是将Stream<String[]>转换为Stream<String>
                *     map是将Stream<String>转换成Stream<String[]>
                *
                * 参考 https://www.cnblogs.com/bigjor/p/17009630.html
                */
        }
    }
    

2、对ChatGPT进行关于WebFlux的提问

1)基本概念理解问题
  • 问题1:什么是背压?- 处理生产者消费者速度不匹配的问题

    背压(Backpressure)是在响应式编程中用来处理生产者和消费者之间速度不匹配的情况的一种策略。在异步数据流中,如果生产者产生的事件或数据速度过快,而消费者的处理速度跟不上,就可能导致数据积压,最终引发问题。

    背压机制的主要目标是协调生产者和消费者之间的速度,以避免资源耗尽、内存溢出等问题。有几种常见的处理背压的策略:

    1. 缓冲(Buffering): 数据流中的元素被缓存在一个有限大小的缓冲区中,当缓冲区满时,生产者可能需要等待,或者新的元素可能会被抛弃。
    2. 丢弃(Dropping): 当消费者处理速度不足以跟上生产者时,可以选择丢弃一些元素,以保持整体的流畅性。这可能会导致部分数据的丢失。
    3. 最新元素(Latest Element): 只保留最新产生的元素,丢弃之前的元素。这样确保消费者获得的是最新的数据。
    4. 异常(Error): 当出现背压情况时,直接抛出异常,中止数据流。

    在背压的处理中,要根据具体的应用场景和需求选择合适的策略。不同的响应式编程框架和库提供了不同的背压处理机制,如 Reactor 中的 onBackpressureBuffer、RxJava 中的 onBackpressureBuffer 等。

  • 问题2:那生产者是如何产生元素的呢?产生的元素是怎样被订阅到的呢?- 通过回调函数来订阅或通知

    在响应式编程中,生产者通常是通过生成异步事件或者数据来产生元素。这些异步事件可以是来自外部系统、用户输入、定时器、网络请求等。

    在 Reactor 中,生产者主要使用 MonoFlux 这两个类来表示。

    1. Mono(单值): 用于表示包含零个或一个元素的数据流。可以通过 Mono.just(value) 创建一个包含单个元素的 Mono。

      Mono<String> mono = Mono.just("Hello, World!");
      
    2. Flux(多值): 用于表示包含零个、一个或多个元素的数据流。可以通过 Flux.just(value1, value2, ...) 创建一个包含多个元素的 Flux。

      Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
      

    这些生产者创建的数据流可以通过订阅者(Subscriber)进行订阅。订阅者是实现了 Subscriber 接口的对象,通过实现接口的方法,可以定义对数据流中元素的处理方式。

    以下是一个简单的例子,展示了如何使用 Reactor 中的 MonoFlux 创建生产者,并通过订阅者进行订阅:

    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    public class ReactorExample {
        public static void main(String[] args) {
            // 创建一个包含单个元素的 Mono
            Mono<String> mono = Mono.just("Hello, World!");
    
            // 创建一个包含多个元素的 Flux
            Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
    
            // 订阅 Mono,并定义订阅者的行为
            mono.subscribe(
                value -> System.out.println("Mono Received value: " + value),
                error -> System.err.println("Mono Error: " + error),
                () -> System.out.println("Mono Completed")
            );
    
            // 订阅 Flux,并定义订阅者的行为
            flux.subscribe(
                value -> System.out.println("Flux Received value: " + value),
                error -> System.err.println("Flux Error: " + error),
                () -> System.out.println("Flux Completed")
            );
        }
    }
    

    在这个例子中,通过 Mono.just 创建了一个包含单个元素的 Mono,通过 Flux.just 创建了一个包含多个元素的 Flux。然后通过 subscribe 方法订阅这两个生产者,传递了实现了 Subscriber 接口的对象,定义了对数据流中元素的处理方式。

  • 问题3:响应式编程和传统编程是不是只在设计理念上有所不同,在压测下最终的效果都差不多?- 响应式编程适合并发实时性要求高、事件驱动的场景,而传统编程更适合简单、同步的应用场景

    响应式编程和传统编程在设计理念和编码模型上确实有很大的不同,主要是由于响应式编程旨在处理异步和事件驱动的场景。虽然在某些情况下它们可以达到相似的效果,但在特定的应用场景中,响应式编程具有独特的优势。

    1. 异步处理: 响应式编程更强调异步和非阻塞的操作,适用于处理事件流、用户界面交互、网络请求等异步场景。传统编程往往以同步的方式进行数据处理,可能需要使用回调或者多线程来实现异步操作。
    2. 背压处理: 响应式编程通过背压机制来处理生产者和消费者速度不匹配的问题,确保系统在高负载情况下能够优雅地处理数据流。传统编程中可能需要手动处理这些情况,而响应式编程提供了一些内置的机制。
    3. 组合性: 响应式编程强调数据流的组合性,通过操作符来构建复杂的数据处理逻辑。这使得编写具有高度可组合性的代码更为容易。传统编程通常采用命令式的风格,可能需要更多的代码来实现相同的逻辑。
    4. 反应式流式处理: 响应式编程通过流式处理的方式更容易处理数据流,使得代码更具表达性和简洁性。在传统编程中,可能需要使用循环、迭代等结构来处理流程。

    在一些高并发、实时性要求高、事件驱动的场景下,响应式编程的优势更为明显。然而,在某些简单、同步的应用场景中,传统编程可能更为直观和容易理解。对于性能问题,具体的影响可能要根据具体的实现和使用场景来评估,而不是简单地归结为响应式编程和传统编程的选择。

  • 问题4:那reactor projector是如何实现异步处理的?是不使用Tomcat服务器,使用Netty服务器的缘故吗?

    Reactor Projector 是 Reactor 生态系统中的一个模块,用于提供反应式流(Reactive Streams)的实现。它的异步处理是通过使用 Reactor 的核心模块提供的反应式编程特性来实现的。

    Reactor Projector 主要建立在 Reactor Core 之上,而 Reactor Core 是 Reactor 生态系统的核心库,提供了一套用于构建反应式应用程序的基础设施。Reactor Core 中包含了 Flux、Mono 等类,用于表示异步数据流和异步单值。

    Reactor 的异步处理中,关键的组件是 Scheduler,它负责管理并发执行异步任务。Reactor 使用 Netty、Project Reactor 的 Reactor Netty、以及其他一些调度器来提供异步执行的能力。这些调度器允许你指定在哪个线程上执行数据流的处理,以及如何处理并发的问题。

    在 Web 应用程序中,Netty 是一个高性能的异步事件驱动的网络框架,被用于构建响应式的、非阻塞的网络应用。因此,当使用 Reactor Projector 来构建反应式 Web 应用时,它可能倾向于使用 Netty 作为底层服务器,以便更好地适应异步和非阻塞的要求。

    在传统的 Servlet 容器(如 Tomcat)中,处理方式可能更偏向于阻塞式的同步处理,因此 Reactor 在这样的环境中可能不如在异步的 Netty 中表现出色。不过,也可以在传统的 Servlet 容器中使用 Reactor,但可能需要一些适配工作。

    总体来说,Reactor Projector 和 Reactor Core 提供了一套用于构建异步、响应式应用程序的工具和模块,而底层的异步处理是由调度器和特定的底层实现(如 Netty)来支持的

2)代码理解问题
  • 问题1:Reactor projector中关于flux.subscribe订阅的是什么数据?

    在 Reactor Projector 中,Flux 是一个表示异步多值序列的类。通过 Flux,你可以创建一个数据流,并通过 subscribe 方法添加订阅者,让其对数据流中的元素进行消费。

    Flux 表示的数据流可以包含来自不同源的多个元素,并且可以在异步的情况下进行处理。当你调用 subscribe 方法时,你可以传递一个订阅者对象,该订阅者对象定义了如何处理数据流中的元素、错误以及完成信号。

    具体来说,在 Reactor 中,Flux.subscribe 方法的订阅者(Subscriber)接口如下:

    public interface Subscriber<T> {
        void onSubscribe(Subscription s);
        void onNext(T t);
        void onError(Throwable t);
        void onComplete();
    }
    
    • onSubscribe(Subscription s): 当订阅发生时,会调用这个方法,传递一个 Subscription 对象,通过它可以请求数据或者取消订阅。
    • onNext(T t): 当新的元素可用时,会调用这个方法,传递新的元素。
    • onError(Throwable t): 当发生错误时,会调用这个方法,传递错误信息。
    • onComplete(): 当数据流完成时,会调用这个方法。

    当你使用 Flux.subscribe 订阅一个 Flux 时,你需要提供一个实现了 Subscriber 接口的对象,该对象定义了对数据流中不同事件的处理方式。

    下面是一个简单的示例,展示了如何使用 Flux 创建一个数据流,并通过订阅者处理其中的元素:

    import reactor.core.publisher.Flux;
    
    public class FluxExample {
        public static void main(String[] args) {
            Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
    
            // 订阅并定义订阅者的行为
            flux.subscribe(
                value -> System.out.println("Received value: " + value),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed")
            );
        }
    }
    

    在这个例子中,flux.subscribe 的三个参数分别对应订阅者的 onNextonErroronComplete 方法,定义了对数据流中元素的处理方式。

  • 问题2:在 Reactor Projector 库中,sinks有什么作用?

    在 Reactor Projector 库中,Sinks 是用于创建不同种类的 Sinks(可观察对象的生产者)的工厂类。Sinks 提供了一组方法,允许你创建不同类型的 Sinks,包括单值和多值的 Sinks。这些 Sinks 可以被用于创建 Flux 或 Mono 对象,从而构建响应式数据流。

    其中,Sinks.many() 方法用于创建多值的 Sink,而 Sinks.one() 方法用于创建单值的 Sink。这两种类型的 Sink 分别对应于多值的数据流和单值的数据流。在这些 Sinks 上,你可以进行一系列的操作,例如添加元素、触发错误、完成流等。

    下面是一个简单的例子,演示了如何使用 Sinks.many() 创建一个 Flux:

    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    
    public class SinksExample {
        public static void main(String[] args) {
            // 创建一个多值的 Sink
            Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
    
            // 使用 Sink 创建 Flux
            Flux<String> flux = sink.asFlux();
    
            // 向 Sink 中添加元素
            sink.tryEmitNext("Element 1");
            sink.tryEmitNext("Element 2");
    
            // 订阅 Flux,处理元素
            flux.subscribe(System.out::println);
    
            // 向 Sink 中添加更多元素
            sink.tryEmitNext("Element 3");
            sink.tryEmitNext("Element 4");
    
            // 完成 Sink(结束数据流)
            sink.tryEmitComplete();
        }
    }
    

    在这个例子中,通过 Sinks.many() 创建了一个多值的 Sink,然后通过 sink.asFlux() 将其转换为 Flux。之后,通过 sink.tryEmitNext() 向 Sink 中添加元素,通过订阅 Flux 来处理这些元素。最后,通过 sink.tryEmitComplete() 完成 Sink,结束数据流。

  • 问题3:在react projector库中下面这代码是什么意思:Sinks.Many processor = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);?

    在 Reactor Projector 库中,这段代码是用于创建 FluxSink 的配置。让我们逐步解释:

    1. Sinks.many(): 这是用于创建多值(多元素)FluxSink的方法。Sinks 是 Reactor 提供的用于创建不同种类的 Sinks(可观察对象的生产者)的工厂类。many() 表示创建一个多值的 Sink。
    2. multicast(): 这是一个操作符,用于将 FluxSink 转换为一个可以被多个订阅者(Subscribers)共享的多播(multicast)的 Flux。这样,多个订阅者可以共享相同的数据流。
    3. onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false): 这是配置背压(backpressure)策略的部分。在反应式编程中,当生产者速度超过消费者速度时,就会出现背压。在这里,onBackpressureBuffer 用于定义缓冲区的大小和是否使用异步线程来处理溢出。
      • Queues.SMALL_BUFFER_SIZE: 这是用于定义缓冲区大小的常量。在这里,使用了一个小型缓冲区大小,表示缓冲区能够存储的元素数量有限。
      • false: 表示不使用异步线程来处理溢出。当缓冲区满了,新的元素就会被抛弃,而不是等待缓冲区有空间。

    综合起来,这段代码的目的是创建一个多值的 FluxSink,并将其转换为一个多播的 Flux,同时配置了一个缓冲区,当订阅者处理速度不足以跟上生产者时,使用缓冲区来缓存元素,当缓冲区满了时,新的元素会被抛弃。

  • 问题4:那processor.asFlux();这个方法有什么作用?

    processor.asFlux() 方法用于将创建的 Sinks.Many 转换为 Reactor 的 Flux 对象。在 Reactor 中,Flux 是表示多值异步序列的主要类,而 Sinks.Many 是用于生产多值的数据流的生产者。

    通过 processor.asFlux(),你可以将 Sinks.Many 转换为 Flux,然后将其用于订阅者消费数据流。这样,你可以使用 Reactor 提供的各种操作符和方法来处理这个数据流,例如 mapfilterflatMap 等。

    举例来说,可以像下面这样使用 processor.asFlux()

    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    
    public class ProcessorExample {
        public static void main(String[] args) {
            // 创建一个多值的 Sink
            Sinks.Many<String> processor = Sinks.many().multicast().onBackpressureBuffer();
    
            // 转换为 Flux
            Flux<String> flux = processor.asFlux();
    
            // 订阅 Flux,处理元素
            flux.subscribe(System.out::println);
    
            // 向 Sink 中添加元素
            processor.tryEmitNext("Element 1");
            processor.tryEmitNext("Element 2");
    
            // 完成 Sink(结束数据流)
            processor.tryEmitComplete();
        }
    }
    

    在这个例子中,通过 processor.asFlux()Sinks.Many 转换为 Flux,然后通过 flux.subscribe(System.out::println) 订阅这个 Flux,最后通过 processor.tryEmitNextSinks.Many 添加元素。这样就可以通过 Flux 的订阅者机制来消费数据流。

二、WebFlux基本概念

Spring Reactive官方介绍如下:https://spring.io/reactive

Reactive systems have certain characteristics that make them ideal for low-latency, high-throughput workloads. Project Reactor and the Spring portfolio work together to enable developers to build enterprise-grade reactive systems that are responsive, resilient, elastic, and message-driven.

Reactive的技术栈如下:

The Spring portfolio provides two parallel stacks. One is based on a Servlet API with Spring MVC and Spring Data constructs. The other is a fully reactive stack that takes advantage of Spring WebFlux and Spring Data’s reactive repositories. In both cases, Spring Security has you covered with native support for both stacks.

在这里插入图片描述

反应式技术组件的关系WebFlux + SpringBoot + r2dbc,底层用到了JDK 8的函数式编程

在IOT平台中,使用webFlux非阻塞通信(使用**Netty容器**而非Tomcat容器,不使用Servlet API),Spring WebFluxSpring MVC是同级别的,如果共存则优先使用SpringMVC

1、WebFlux核心理念(Reactive 响应式宣言:快速响应,弹性伸缩,消息驱动)

参考WebFlux响应式框架快速入门,

响应式编程顾名思义就是在于响应二字,我们需要在某个事件发生时做出响应。

这里提一个著名的设计原则:好莱坞原则(Hollywood principle)

Don't call us, we will call you.

演员提交简历之后,回家等着就好,演艺公司会主动打电话给你。


传统的Spring MVC是基于Servlet API的框架,Spring WebFlux是一套全新的Reactive Web技术栈,实现完全非阻塞,支持Reactive Streams背压等特性,并且运行环境不限于Servlet容器(Tomcat、Jetty、Undertow),如Netty等

“Reactive”(响应式宣言)是WebFlux的核心目标,它是指,应用对变化(负载的变化、外部服务可用性的变化等)做出即时响应,使得应用保持正常。
根据响应式宣言(The Reactive Manifesto),响应式系统具备以下特点:

  1. 快速响应(Responsive)
    系统在各种情况下都会尽全力保证及时响应。它是可用性和实用性的基础, 还意味着问题能被迅速发现,并得到有效处理。
  2. 回弹性(Resilient)
    系统在面临故障时依然保持快速响应。它是通过**复制(replication)、抑制(containment)、隔离(isolation)和委托(delegation)**来实现的:故障被抑制在单个组件中, 且各组件相互隔离, 使系统在部分失败和恢复时, 可以不影响整体的功能。每个组件的恢复都委托给另一个(外部)组件, 高可用在必要时通过复制来保证。
    3.可伸缩性(Elastic)
    系统在不同的负载下都能保持快速的响应。响应式系统可以根据服务的请求量, 动态增加或减少相应的资源
  3. 消息驱动(Message Driven)
    响应式系统依赖异步的消息传递(信号量), 以确定各种组件的边界, 并确保松耦合(loose coupling)、隔离性(isolation)、位置透明性(location transparency), 并提供将错误封装为消息的手段。
    在这里插入图片描述

2、WebFlux特点与MVC的比较(异步非阻塞,去Servlet,函数式编程)

在这里插入图片描述

  1. 异步和非阻塞:支持了应用层的异步和底层的IO非阻塞;

  2. 函数式编程:使用Lambda表达式和函数式接口来定义请求处理程序。

  3. 去servlet:允许可以不基于servlet API。默认的Netty容器,不基于Servlet API。

Spring WebFluxSpring MVC的比较(通义千问)

Spring MVCSpring WebFluxSpring框架中用于构建Web应用程序的两种不同模型,它们在设计理念、并发模型以及对资源利用上有所不同。

Spring MVC (Model-View-Controller)

  1. 同步阻塞模型:基于Servlet API,遵循传统的请求/响应模式,每个请求会创建一个新的线程来处理,直到响应被完全生成并发送给客户端。
  2. 架构:MVC架构清晰,易于理解和实现。它包含控制器(Controller)、模型对象(Model)和视图(View),适合构建基于HTTP的同步RESTful服务。
  3. 资源利用:对于短时且密集型I/O操作不是特别高效,因为每个请求都会占用一个线程直到完成,可能会导致大量的线程消耗。
  4. 适用场景:适用于大部分传统Java Web应用,尤其是需要与大量现有Servlet容器集成、依赖同步API或者对异步编程不敏感的应用。

Spring WebFlux

  1. 非阻塞、反应式编程模型:基于ReactorProject Reactor库实现,支持异步非阻塞I/O,可以在单个线程上处理多个请求,从而提高服务器的并发性能和资源利用率
  2. 架构:采用函数式编程风格,使用HandlerFunctionRouterFunction等组件进行路由和处理请求。可以结合Reactive Streams规范处理流式数据。
  3. 资源利用:非常适用于高并发、低延迟的场景,如实时分析、物联网设备数据处理等,因为它可以避免不必要的线程上下文切换,并在较少的线程上处理更多的请求。
  4. 适用场景:适用于云原生环境、微服务架构中的高并发场景,尤其当系统设计倾向于事件驱动和反应式编程时。

总结:

  • Spring MVC更适合传统的多线程同步服务开发,兼容性好,生态广泛,易用性强。
  • Spring WebFlux则适用于现代的、异步和非阻塞的服务设计,能够充分利用底层操作系统和硬件的性能,在处理大量并发请求时表现更优,但可能需要开发者具备一定的响应式编程基础。

3、WebFlux模块的整体架构(响应式模式 = 观察者模式,上下游数据传递:Subscriber调用Publishersubscribe() 订阅数据;Publisher 调用 SubscriberonSubscribe()传递数据(订阅媒介Subscription);Subscriber可以通过Subscription#request请求数据,通过Subscription#cancel取消数据发布等)

Project Reactor模块架构 参考 响应式编程入门之 Project Reactor
响应式可以理解为观察者模式,通过订阅和发布数据流中的数据对数据进行处理。 在 Reactor 中大部分实现都是按照下图的逻辑来执行的:

  • 1)首先是Subscriber订阅者)通过调用 Publishersubscribe 方法,主动订阅 Publisher发布者
  • 2)Publisher 在向下游发送数据之前,会先调用 SubscriberonSubscribe 方法,传递的参数为 Subscription订阅媒介
  • 3)Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
  • 4)Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据
  • 5)在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError

在这里插入图片描述

调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布

Note:java中生产消费者模式和观察者模式有啥区别 参考https://worktile.com/kb/ask/38120.html

概念不同:

  • 生产消费者模式就是一个多线程并发协作的模式。在这个模式中,一部分线程被用于去生产数据,另一部分线程去处理数据,于是便有了形象的生产者与消费者了。
  • 观察者模式是一种对象行为模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。

编程范式不同:

  • 生产者消费者模式本身并不属于设计模式中的任何一种;
  • 观察者模式属于Gang of Four提出的23种设计模式中的一种,也是面向对象的设计模式中的一种。

4、WebFlux中关于Flux和Mono的介绍(FluxMono 都是数据流的发布者,可以发出元素值,错误信号,完成信号)

参考WebFlux 响应式编程介绍及简单实现

Reactor提供了2种返回类型,Mono和Flux。

  • Mono0…1个数据
  • Flux0…N个数据

使用 Flux 和 Mono 都可以发出三种数据信号:Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
三种信号的特点:

  • 1)错误信号和完成信号都是终止信号,不能共存
  • 2)如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
  • 3)如果没有错误信号,也没有完成信号,表示是无限数据流

jdk8想使用flux需要引入依赖:org.projectreactor.reactor-core

  • just():创建Flux序列,并声明指定数据流;
  • subscribe():订阅Flux序列,只有进行订阅后才回触发数据流,不订阅就什么都不会发生;

关于Project Reactor 响应式编程中Mono,Flux,操作符的使用 参考 Project Reactor 响应式编程

  • 反应式技术组件的关系:WebFlux + SpringBoot + r2dbc,底层用到了JDK 8的函数式编程

    在这里插入图片描述

    响应式编程是观察者模式,订阅者和生产者之间通过设置缓存队列的大小来实现背压

    在这里插入图片描述

    Reactive Stream组件模型中Processor即作为上游节点的订阅者,也作为下游节点的发布者,起到数据转换、过滤的作用。

    在这里插入图片描述

二、WebFlux的基本使用

1、WebFlux的使用 - Flux / Mono / 操作符

参考 Project Reactor 响应式编程

1)WebFlux如何接入到Springboot项目
  • 如果你是springboot的WebMVC项目,很容易就可以改为WebFlux项目:

    pom中引入spring-boot-starter-webflux即可使用WebFlux,注意不要引入webMVC,否则会走spring MVC,如果走webflux,启动会打印NettyWebServer

  • 和springMVC的用法一定程度兼容

    1)支持spring的各种注解

    @Controller
    @RequestMapping
    @ResponseBody
    

    2)允许返回非Mono、非Flux类型,但这样就是阻塞代码了。

    3)非阻塞返回Mono、Flux类型

    4)HttpServletRequest变成了ServerHttpRequest,response也是类似的变化。

2)快速上手Reactive

参考【Spring Webflux】深入Spring5新特性Webflux模块详解

  • 例子1 - 响应式编程基本流程(创建发布者,创建订阅者,建立两者关系,发布者发布数据)

    public static void main(String[] args) {
            // Step1: 创建发布者 (直接创建Flux/Mono等发布者,同时会以流的形式发送数据)
    //        Publisher publisher = new Publisher() {
    //            @Override
    //            public void subscribe(Subscriber subscriber) {
    //                System.out.println("订阅者完成订阅");
    //            }
    //
    //            public void submit(String data){
    //                System.out.println("开始发送数据:" + data);
    //            }
    //        };
            // Step1: 创建发布者并发布数据(找不到SubmissionPublisher类,直接创建Flux/Mono等发布者,同时会以流的形式发送数据)
            Flux<String> stringFlux = Flux.just("Hello","World");
    
            // Step2: 创建订阅者
            Subscriber subscriber = new Subscriber() {
    
                Subscription subscription;
                @Override
                public void onSubscribe(Subscription subscription) {
                    System.out.println("开始建立订阅关系");
                    this.subscription = subscription;
                    subscription.request(1);  // 第一次需要
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.println("开始接收数据:" + o);
                    // 业务处理
                    subscription.request(10); // 背压
                }
    
                @Override
                public void onError(Throwable throwable) {
                    System.out.println("接收了错误数据");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("已成功接收完数据");
                }
            };
    
            // Step3: 建立订阅关系
            stringFlux.subscribe(subscriber);
        }
        
    ---
    开始建立订阅关系
    开始接收数据:Hello
    开始接收数据:World
    已成功接收完数据
    
    
  • 例子2 - 使用Processor中间处理角色的响应式编程Processor即作为上游节点的订阅者,也作为下游节点的发布者,起到数据转换、过滤的作用)

    /**
            * Processor和发布者建立订阅关系,并将信息转发给订阅者
            */
        public static void main(String[] args) {
    
            // Step1: 创建订阅者(找不到SubmissionPublisher类,直接创建Flux/Mono等发布者,同时会以流的形式发送数据)
            Flux stringFlux = Flux.just("Hello", "World");
    
            // Step2: 创建Processor
            Processor processor = new Processor<String,String>(){
    
                Subscription subscription;
    
                @Override
                public void onSubscribe(Subscription subscription) {
                    System.out.println("【Processor】开始建立与发布者的订阅关系");
                    this.subscription = subscription;
                    subscription.request(1);  // 第一次需要
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("【Processor】接收数据:" + s);
                    subscription.request(10);  // 背压
                }
    
                @Override
                public void onError(Throwable throwable) {
                    System.out.println("【Processor】接收了错误数据");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("【Processor】已成功接收完数据");
                }
    
                @Override
                public void subscribe(Subscriber subscriber) {
                    System.out.println("【Processor】订阅者完成订阅");
                }
            };
    
            // Step3: 创建订阅者
            Subscriber subscriber = new Subscriber() {
    
                Subscription subscription;
                @Override
                public void onSubscribe(Subscription subscription) {
                    System.out.println("【Subscriber】开始建立订阅关系");
                    this.subscription = subscription;
                    subscription.request(1);  // 第一次需要
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.println("【Subscriber】接收数据:" + o);
                    // 业务处理
                    subscription.request(10); // 背压
                }
    
                @Override
                public void onError(Throwable throwable) {
                    System.out.println("【Subscriber】接收了错误数据");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("【Subscriber】已成功接收完数据");
                }
            };
    
            // Step5: 订阅者和Processor建立订阅关系
            processor.subscribe(subscriber);
    
            // Step4: Processor和发布者建立订阅关系
            stringFlux.subscribe(processor);
    
            // Step6: 发布者发布数据(Flux/Mono一开始就发布了)
        }
        
    ---Processor】订阅者完成订阅
    【Processor】开始建立与发布者的订阅关系
    【Processor】接收数据:HelloProcessor】接收数据:WorldProcessor】已成功接收完数据
    
    
  • 例子3 -使用响应式编程进行简单业务处理

        /**
            * 输入: hello guys i am bole welcome to normal school jdk quick fox prizev
            * 输出: abcdefghijklmnopqrstuvwxyz
            */
        public static void main(String[] args) {
            String src = "hello guys i am bole welcome to normal school jdk quick fox prizev";
            Flux.fromArray(src.split(""))
                    .filter(e -> !e.equals(" "))
                    .flatMap(e -> Flux.just(e.split("")))
    //                .doOnNext(e -> {
    //                    System.out.println("doOnNext:" + e);
    //                })
                    .distinct()
                    .sort()
                    .subscribe(e -> {
                        System.out.print(e);
                    });
        }
        ---
        abcdefghijklmnopqrstuvwxyz
    
3)Flux的使用(常用方法:just()fromArray(),fromIterable()fromStream()empty()error()never()range(int start, int count)interval(Duration period)interval(Duration delay, Duration period)

Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()onError() 会被调用

  1. just()的使用:

    可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。

    Flux.just("hello", "world")
        .doOnNext((i) -> {
                System.out.println("[doOnNext] " + i);
            })
        .doOnComplete(() -> System.out.println("[doOnComplete]"))
        .subscribe(i -> System.out.println("[subscribe] " + i));
    
    // 执行结果[doOnNext] hello
    [subscribe] hello
    [doOnNext] world
    [subscribe] world
    [doOnComplete]
    
  2. fromArray(),fromIterable()和 fromStream():

    可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。

    List<String> arr = Arrays.asList("flux", "mono", "reactor", "core");
    Flux.fromIterable(arr)
        .doOnNext((i) -> {
                System.out.println("[doOnNext] " + i);
            })
        .subscribe((i) -> {
                System.out.println("[subscribe] " + i);
            });
    //执行结果
    [doOnNext] flux
    [subscribe] flux
    [doOnNext] mono
    [subscribe] mono
    [doOnNext] reactor
    [subscribe] reactor
    [doOnNext] core
    [subscribe] core
    
    
  3. empty():

    创建一个不包含任何元素,只发布结束消息的序列

        Flux.empty()
        .doOnNext(i -> {
                System.out.println("[doOnNext] " + i);
            }).doOnComplete(() -> {
                System.out.println("[DoOnComplete] ");
            }).subscribe(i -> {
                System.out.println("[subscribe] " + i);
            });
    //执行结果
    [DoOnComplete]
    
    
  4. error(Throwable error)

    创建一个只包含错误消息的序列。

    try {
        int []arr = new int[5];
        arr[10] = 2;
    } catch (Exception e) {
        Flux.error(e).subscribe(i -> {
        System.out.println("error subscribe");
        });
    }
    //执行结果
    
  5. never():

    创建一个不包含任何消息通知的序列

    Flux.never()
        .doOnNext(i -> {
                System.out.println("doOnNext " + i);
            }).doOnComplete(() -> {
                System.out.println("doOnComplete");
            }).subscribe((i) -> {
                System.out.println("subscribe " + i);
            });
    //执行结果
  6. range(int start, int count)

    创建包含从 start 起始的 count个数量的 Integer 对象的序列。

    Flux.range(5, 10)
        .doOnNext(i -> {
                System.out.println("doOnNext " + i);
            }).doOnComplete(() -> {
                System.out.println("doOnComplete");
            }).subscribe((i) -> {
                System.out.println("subscribe " + i);
            });
    //执行结果
    doOnNext 5
    subscribe 5
    doOnNext 6
    subscribe 6
    doOnNext 7
    subscribe 7
    doOnNext 8
    subscribe 8
    doOnNext 9
    subscribe 9
    doOnNext 10
    subscribe 10
    doOnNext 11
    subscribe 11
    doOnNext 12
    subscribe 12
    doOnNext 13
    subscribe 13
    doOnNext 14
    subscribe 14
    doOnComplete
    
    
  7. interval(Duration period)和 interval(Duration delay, Duration period)

    创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间

    Flux.interval(Duration.ofSeconds(4), Duration.ofSeconds(2))
        .doOnNext(i -> {
                System.out.println("doOnNext " + i);
            }).doOnComplete(() -> {
                System.out.println("doOnComplete " + new Date());
            }).subscribe((i) -> {
                System.out.println("subscribe " + i + ", date: " + new Date());
            });
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //执行结果
    doOnNext 0
    subscribe 0, date: Fri Jun 25 10:17:56 CST 2021
    doOnNext 1
    subscribe 1, date: Fri Jun 25 10:17:58 CST 2021
    doOnNext 2
    subscribe 2, date: Fri Jun 25 10:18:00 CST 2021
    doOnNext 3
    subscribe 3, date: Fri Jun 25 10:18:02 CST 2021
    
    
  8. intervalMillis(long period)和 intervalMillis(long delay, long period)

    与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。

4)Mono的使用(常用方法:fromCallable()fromCompletionStage()delay(Duration duration)ignoreElements(Publisher source)justOrEmpty(Optional<? extends T> data)

Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。

  1. fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier()

    分别从Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。

        Mono.fromCallable(() -> {
                System.out.println("begin callable");
                return "Hello";
            })
        .subscribeOn(Schedulers.elastic())
        .doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
        .subscribe(System.out::println);
    Thread.sleep(10000);
    //执行结果
    begin callable
    doOnNext Hello, thread :elastic-2
    Hello
    
    Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
                System.out.println("begin");
                return "hello";
            }))
        .subscribeOn(Schedulers.elastic())
        .doOnNext((i) -> System.out.println("doOnNext " + i + ", thread :" + Thread.currentThread().getName()))
        .subscribe(System.out::println);
    Thread.sleep(10000);
    //执行结果
    begin
    doOnNext hello, thread :elastic-2
    hello
    
  2. delay(Duration duration)和 delayMillis(long duration)

    创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值

    Mono.delay(Duration.ofSeconds(1)).subscribe(System.out::println);
    Thread.sleep(3000);
    //执行结果, 延迟一秒后打印
    0
    
  3. ignoreElements(Publisher source)

    创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息

    Mono.ignoreElements((i) -> {
                System.out.println("ignoreElements");
            })
        .doOnNext((i) -> System.out.println("doOnNext " + i))
            .subscribe(System.out::println);
    //执行结果
    ignoreElements
    
  4. justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)

    从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素

    Optional<Integer> optional = Optional.empty();
    Mono.justOrEmpty(optional)
        .doOnNext((i) -> System.out.println("doOnNext " + i))
        .subscribe(System.out::println);
    
    System.out.println("========");
    
    optional = Optional.of(100);
    Mono.justOrEmpty(optional)
        .doOnNext((i) -> System.out.println("doOnNext " + i))
        .subscribe(System.out::println);
    //执行结果
    ========
    doOnNext 100
    100
    
    
5、操作符的使用(bufferbufferTimeoutbufferUntilbufferWhilefilterwindowzipWithBiFunctiontakereducereduceWithmergemergeSequentialflatMapflatMapSequentialconcatMapcombineLatest
  1. buffer 和 bufferTimeout,bufferUntil 和 bufferWhile

    这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。

    • 方法 buffer()仅使用一个条件;
    • bufferTimeout()可以同时指定两个条件。指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()bufferTimeoutMillis()两个方法。

    除了元素数量和时间间隔之外,还可以通过 bufferUntilbufferWhile 操作符来进行收集。这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象。

    • bufferUntil一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;
    • bufferWhile则只有当 Predicate 返回 true 时才会收集。一旦值为false,会立即开始下一次收集
    Flux.range(1, 100).buffer(20).subscribe(System.out::println);
    
    //执行结果
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
    [21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
    [41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
    [61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
    [81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
    
  2. filter

    对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素。

    Flux.range(1, 10)
        .filter(i -> i%2==0)
            .doOnNext(i -> {
            System.out.println("[doOnNext] " + i);
        })
        .subscribe(i -> {
            System.out.println("subscribe " + i);
        });
    //执行结果
    [doOnNext] 2
    subscribe 2
    [doOnNext] 4
    subscribe 4
    [doOnNext] 6
    subscribe 6
    [doOnNext] 8
    subscribe 8
    [doOnNext] 10
    subscribe 10
    
  3. window

    window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux

    Flux.range(1, 15).window(5)
        .doOnNext((flux -> {}))
        .subscribe(flux -> {
            flux.doOnNext((item) -> {
                System.out.println("[window] flux: " + item);
            })
        .doOnComplete(() -> System.out.println("flux item complete"))
        .subscribe();
    });
    // 执行结果
    [window] flux: 1
    [window] flux: 2
    [window] flux: 3
    [window] flux: 4
    [window] flux: 5
    flux item complete
    [window] flux: 6
    [window] flux: 7
    [window] flux: 8
    [window] flux: 9
    [window] flux: 10
    flux item complete
    [window] flux: 11
    [window] flux: 12
    [window] flux: 13
    [window] flux: 14
    [window] flux: 15
    flux item complete
    
    
  4. zipWith

    zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流

    也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。

    Flux.just("Hello", "Project")
        .zipWith(Flux.just("World", "Reactor"))
        .subscribe(System.out::println);
    
    System.out.println("======");
    
    Flux.just("Hello", "Project")
        .zipWith(Flux.just("World", "Reactor"), (s1, s2) -> String.format("%s!%s!", s1, s2))
        .subscribe(System.out::println);
    // 执行结果
    Hello,World
    Project,Reactor
    ======
    Hello!World!
    Project!Reactor!
    
    
  5. take:

    take 系列操作符用来从当前流中提取元素。提取的方式可以有很多种。

    take(long n)take(Duration timespan)和 takeMillis(long timespan):按照指定的数量或时间间隔来提取。

    • 1)take:从当前流中提取元素。

      Flux.range(1, 10).take(2).subscribe(System.out::println);
      // 执行结果
      1
      2
      
    • 2)takeLast(long n)提取流中的最后 N 个元素

      Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
      // 执行结果
      9
      10
      
    • 3)takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true

      Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);
      // 执行结果
      1
      2
      3
      4
      5
      6
      
      
    • 4)takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。

      Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
      // 执行结果
      1
      2
      3
      4
      
    • 5)takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素

      Flux.range(1, 5).takeUntilOther((i) -> {
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }).subscribe(System.out::println);
      // 执行结果,暂停1000ms后开始输出
      1
      2
      3
      4
      5
      
      
  6. reduce 和 reduceWith

    reducereduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

    Flux.range(1, 10)
        .reduce((x, y) -> {
            System.out.println("x:" + x + ", y:" + y);
            return x+y;
        })
        .subscribe(System.out::println);
    // 执行结果
    x:1, y:2
    x:3, y:3
    x:6, y:4
    x:10, y:5
    x:15, y:6
    x:21, y:7
    x:28, y:8
    x:36, y:9
    x:45, y:10
    55
    
    
    Flux.range(1, 10)
        .reduceWith(() -> 100, (x, y) -> {
            System.out.println("x:" + x + ", y:" + y);
                        return x+y;
                    })
                    .subscribe(System.out::println);
    // 执行结果
    x:100, y:1
    x:101, y:2
    x:103, y:3
    x:106, y:4
    x:110, y:5
    x:115, y:6
    x:121, y:7
    x:128, y:8
    x:136, y:9
    x:145, y:10
    155
    
    
  7. merge 和 mergeSequential

    mergemergeSequential 操作符用来把多个流合并成一个 Flux 序列。不同之处在于 merge 按照所有流中元素的实际产生顺序来合并,而 mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并

    Flux.merge(Flux.interval(
        Duration.of(0, ChronoUnit.MILLIS),
        Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(
            Duration.of(50, ChronoUnit.MILLIS),
            Duration.of(100, ChronoUnit.MILLIS)).take(2))
        .toStream()
        .forEach(System.out::println);
    System.out.println("==============");
    Flux.mergeSequential(Flux.interval(
        Duration.of(0, ChronoUnit.MILLIS),
        Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(
            Duration.of(50, ChronoUnit.MILLIS),
            Duration.of(100, ChronoUnit.MILLIS)).take(2))
        .toStream()
                    .forEach(System.out::println);
    // 执行结果
    0
    0
    1
    1
    ==============
    0
    1
    0
    1
    
    
  8. flatMap 和 flatMapSequential

    flatMapflatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并flatMapSequentialflatMap 之间的区别与 mergeSequentialmerge 之间的区别是一样的。

    Flux.just(1, 2)
        .flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS), Duration.of(10, ChronoUnit.MILLIS)).take(x))
        .toStream()
        .forEach(System.out::println);
    // 执行结果
    0
    0
    1
    
  9. concatMap 和 combineLatest

    concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。

    • flatMap 不同的是,concatMap根据原始流中的元素顺序依次把转换之后的流进行合并
    • flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的flatMapSequential 在合并之前就已经订阅了所有的流
    Flux.just(5, 10)
        .concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
        .toStream()
        .forEach(System.out::println);
    
    Flux.combineLatest(
        Arrays::toString,
        Flux.intervalMillis(100).take(5),
        Flux.intervalMillis(50, 100).take(5)
    ).toStream().forEach(System.out::println);
    
6)其他说明
  • Flux中的map、flatMap、concatMap的区别

    参考Flux中的map、flatMap、concatMap的区别

    1)map:用于一对一的转换,返回一个新的Flux,元素顺序不变。

    2)flatMap:用于一对多的转换,返回一个新的Flux,元素顺序可能发生变化。

    3)concatMap:用于一对多的转换,返回一个新的Flux,元素顺序与原始Flux中的元素顺序保持一致。
    选择使用哪种方法取决于具体的业务需求和对元素顺序的要求。如果不关心元素顺序,可以考虑使用flatMap,它的并行执行可以提高性能。如果要保持元素顺序,可以使用concatMap,但要注意可能会影响性能。而map适用于简单的一对一转换场景。

2、WebFlux响应式编程说明(容易踩的坑、以及编程规范)(代码待实践验证)

参考响应式编程说明

1)基本概念

Publisher:发布者(数据流),表示数据的生产者。

Subscriber:订阅者,表示数据的消费者。

Mono: 包含0-1个数据的发布者,实现了Publisher

Flux: 包含0-n个数据的发布者,实现了Publisher

Operator: 操作符,表示对数据流中的数据的操作描述。用于改变发布者的行为。

当发布者被订阅时,发布者才开始生产消息:

编写代码实际上是使用操作符来一个描述数据处理逻辑,当发布者被订阅时才会执行这些处理逻辑

2)选择合适的操作符(响应式/非响应式方法,比如flatMap是响应式方法,doOnNext不是响应式方法,因为返回值类型是Consumer<T>

系统中大量使用到了reactor,其核心类只有2个:FluxMono

常用操作符:

  1. map: 转换上游数据: flux.map(UserEntity::getId)
  2. mapNotNull: 转换上游数据, 并忽略null值.(reactor 3.4提供)
  3. flatMap: 转换上游数据,但是结果是一个数据流,并将这个数据流平铺: flux.flatMap(this::findById)
  4. flatMapMany: 转换Mono中的元素为Flux(1个转多个): mono.flatMapMany(this::findChildren)
  5. concat: 将多个流连接在一起组成一个流(按顺序订阅) : Flux.concat(header,body)
  6. merge: 将多个流合并在一起, 同时订阅流: Flux.merge(save(info),saveDetail(detail))
  7. zip: 压缩多个流中的元素: Mono.zip(getData(id), getDetail(id), UserInfo::of)
  8. then: 上游流完成后执行其他的操作.
  9. doOnNext: 流中产生数据时执行.
  10. doOnError: 发送错误时执行.
  11. doOnCancel: 流被取消时执行.如: http未响应前, 客户端断开了连接.
  12. onErrorContinue: 流发生错误时,继续处理数据而不是终止整个流.
  13. defaultIfEmpty: 当流为空时,使用默认值.
  14. switchIfEmpty: 当流为空时,切换为另外一个流.
  15. as: 将流作为参数,转为另外一个结果:flux.as(this::save)

完整文档请查看官方文档

3)null处理

数据流中到元素不允许为null,因此在进行数据转换的时候要注意null处理

//存在缺陷
return this
    .findById(id)
    //getDescription可能返回null,为null时会抛出空指针,
    .map(UserEntity::getDescription); 

//使用以下方式替代
return this
    .findById(id)
    .mapNotNull(UserEntity::getDescription); 
4)非阻塞与阻塞

默认情况下,reactor调度器由数据的生产者(Publisher)决定。在WebFlux中则是netty的工作线程。 为了防止工作线程被阻塞导致服务崩溃,在一个请求的流中,禁止执行存在阻塞(如执行JDBC)可能的操作的。如果无法避免阻塞操作,应该指定调度器。

Note:之前在通过定时任务处理请求流时,出现任务阻塞未执行的问题,最后的解决方法是通过指定的调度器去执行阻塞的操作。

paramMono
    .publishOn(Schedulers.boundedElastic()) //指定调度器去执行下面的操作
    .map(param-> jdbcService.select(param))
5)上下文

在响应式中,大部分情况是禁止使用ThreadLocal的(可能造成内存泄漏)。因此基于ThreadLocal的功能都无法使用。reactor中引入了上下文,在一个流中,可共享此上下文 。通过上下文进行变量共享,例如事务, 权限等功能

//从上下文中获取
@GetMapping
public Mono<UserInfo> getCurrentUser(){
    return Mono
        .deferContextual(ctx->userService
            .findById(ctx.getOrEmpty("userId")
                            .orElseThrow(IllegalArgumentException::new));
}

//定义过滤器设置数据到上下文中
class MyFilter implements WebFilter{
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain){
        return chain
            .filter(exchange)
            .contextWrite(Context.of("userId",...))
    }
}
6)常见问题
a)问题一:我写的操作看上去是正确的,但是没有执行

有3种可能:上游流为空,多个流未组合在一起,在不支持响应式的地方使用了响应式

  • a)没有使用return关键字返回

    错误示例:

    public Mono<Response> handleRequest(Request request){
    // 没有return
        this
            .findOldData(request);
    }
    

    正确示例:

    public Mono<Response> handleRequest(Request request){
        return this
            .findOldData(request);
    }
    
  • b)上游流为空

    public Mono<Response> handleRequest(Request request){
        
        return this
            .findOldData(request)
            .flatMap(old -> {
                //这里为什么不执行? 
                return ....
            })
    }
    

    findOldData返回的流为空时,下游的flatMap等操作符需要操作流中元素的操作符是不会执行的。 可以通过switchIfEmpty操作符来处理空流的情况。

  • c)多个流未组合在一起

    只要方法返回值是Mono或者Flux,都不能单独行动;只要方法中调用了任何响应式操作,那这个方法也应该是响应式。(返回Mono或者Flux

    class Service{
        Mono<Void> handleRequest(request);
    }
    

    错误示例:

    //错误示例,handleRequest是响应式的,但是此方法没有使用响应式操作。
    public Result handleRequest(Request request){
        
        service.handleRequest(request);
    
        return ok;
    }
    

    正确示例:

    //正确示例
    public Mono<Result> handleRequest(Request request){
        return service
                //处理请求
                .handleRequest(request)
                //返回结果
                .thenReturn(ok);
    }
    
  • d)在不支持响应式的操作符中使用响应式

    错误示例:saveLog是响应式的,但是doOnNext并不支持响应式操作

    
    public Mono<Void> saveLog(Request req,Response resp){
        ...
    }
    
    public Mono<Result> handleRequest(Request request){
    
    return service
                //处理请求
                .handleRequest(request)
                //记录日志 此为错误的用法,saveLog是响应式的,但是doOnNext并不支持响应式操作
                .doOnNext(response-> saveLog(request,response) )
                //返回结果
                .thenReturn(ok);
    }
    

    doOnNext()方法并不支持响应式:从doOnNext方法的语义以及参数Consumer<T>可知,此方法是不支持响应式的(Consumer<T>只有参数没有返回值);saveLog是响应式的方法,因此这就不能在doOnNext()方法中使用响应式操作

    正确示例:

    return service
                //处理请求
                .handleRequest(request)
                //记录日志
                .flatMap(response-> saveLog(request,response) )
                //返回结果
                .thenReturn(ok);
    
  • e)在流内部订阅终止了整个流

    
    public Mono<Void> saveLog(Request req,Response resp){
        ...
    }
    

    错误示例:

    //错误
    public Mono<Response> handleRequest(Request request){
        
        return service
        //处理请求
        .handleRequest(request)
        //记录日志 此为错误的用法
        .flatMap(response-> {
            saveLog(request,response)
            .subscribe();
            return Mono.emtpy();
        })
        //返回结果
        .thenReturn(ok);
    }
    

    正确示例:

    //正确
    public Mono<Response> handleRequest(Request request){
        
        return service
        //处理请求
        .handleRequest(request)
        //记录日志
        .flatMap(response-> {
            return saveLog(request,response);
        })
        //返回结果
        .thenReturn(ok);
    }
    
b)问题二:我想获取流中的元素怎么办 - 不要试图取值
  • 不要试图从流中获取数据出来,而是先思考需要对流中元素做什么。

    需要对流中的数据进行操作时,都应该使用对应操作符来处理,根据Flux或者Mono提供的操作符API进行组合操作。

    传统编程方式:

    public List<Book> getAllBooks(){
        List<BookEntity> bookEntities = repository.findAll();
    
        List<Book> books = new ArrayList(bookEntities.size());
    
        for(BookEntity entity : bookEntities){
            Book book = entity.copyTo(new Book());
            books.add(book);
        }
    
        return books;
    }
    

    错误示例:响应式编程错误取值示例

    public Book getAllBooks(){
    
        return getRepository()
            .createQuery()
            .where("id",1)
            .fetchOne()
            .block();
    }
    
    

    警告

    在响应式编程中,在任何时候执行业务代码时都不要使用block()方法,使用block()方法可能引发以下问题:

    1. 阻塞线程:调用block()方法会阻塞当前线程,导致无法处理其他并发请求。这会降低系统的吞吐量和响应性能
    2. 死锁风险:如果在处理响应式流时使用了block()方法,而其中某些操作也依赖于同一个线程的结果,则可能导致死锁。
    3. 内存资源浪费:阻塞调用将持续占用线程,而每个线程都需要额外的内存资源。如果应用程序中同时有大量的阻塞操作,可能导致线程池耗尽和内存资源浪费。

    正确示例:响应式编程正确取值示例

    为了避免使用block(),我们应该尽可能地使用响应式操作符(如map、flatMap、filter等)对数据流进行转换和处理,并使用其他响应式方法(如subscribe())来订阅数据流并触发异步处理

    public Flux<Book> getAllBooks(){
        return repository
                .findAll()
                .map(entity-> entity.copyTo(new Book()))
    }
    
    
c)问题三:我需要在非响应式方法中使用响应式怎么办?
  • 尽量减少在响应式方法中使用block()

    public void handleRequest(Request request){
    
        //不到万不得已请勿使用block方法
        //logService.saveLog(request).block()
    
        //
        logService
        .saveLog(request)
        .subscribe(
            result->log.debug("保存成功 {}",request),
            error->log.warn("保存失败 {}",request,error)
        )
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1527172.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【视频异常检测】Delving into CLIP latent space for Video Anomaly Recognition 论文阅读

Delving into CLIP latent space for Video Anomaly Recognition 论文阅读 ABSTRACT1. Introduction2. Related Works3. Proposed approach3.1. Selector model3.2. Temporal Model3.3. Predictions Aggregation3.4. Training 4. Experiments4.1. Experiment Setup4.2. Evaluat…

Linux设置IP地址多种方法保姆级教程,外加修改主机名并通过生成密钥,组建机群。

[引入提问] 怎么设置linux服务器的名称&#xff1f; 方法1&#xff1a; nmtui—自动打开设置机器名称的操作界面---输入即可。重启hostnamed服务&#xff0c; systemctl restart system-hostnamed 方法2&#xff1a; 直接编辑 vim /etc/hostname &#xff0c;输入新的…

VR历史建筑漫游介绍|虚拟现实体验店加盟|VR设备购买

VR历史建筑漫游是一种利用虚拟现实技术&#xff0c;让用户可以身临其境地参观和探索历史建筑的体验。通过VR头显和相关设备&#xff0c;用户可以在虚拟环境中自由移动和互动&#xff0c;感受历史建筑的真实氛围和文化内涵。 在VR历史建筑漫游中&#xff0c;您可以选择不同的历史…

Vue+SpringBoot打造数据可视化的智慧河南大屏

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 数据模块 A4.2 数据模块 B4.3 数据模块 C4.4 数据模块 D4.5 数据模块 E 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的数据可视化的智慧河南大屏&#xff0c;包含了GDP、…

SpringCloud搭建微服务之Micrometer分布式链路追踪

1. 概述 由于Spring Cloud Sleuth最新版本只支持Spring Boot 2.7.x&#xff0c;核心项目已经迁移到Micrometer Traceing项目&#xff0c;Spring Boot 3.x版本要实现分布式链路追踪需要集成Micrometer。更多详情可以参阅Micrometer官网 本文将以Spring Boot 3.2.x和Spring Clo…

视频私有云,HDMI/AV多硬件设备终端接入,SFU/MCU视频会议交互方案。

在视频业务深入的过程中越来越多的硬件设备接入视频交互的视频会议中远程交互&#xff0c;有的是视频采集&#xff0c;有的是医疗影像等资料&#xff0c;都需要在终端承显&#xff0c;这就需要我们的设备终端能多设备&#xff0c;多协议接入&#xff0c;设备接入如下。 1&#…

Python Web开发记录 Day13:Django part7 Ajax入门与案例(任务管理)

名人说&#xff1a;莫道桑榆晚&#xff0c;为霞尚满天。——刘禹锡&#xff08;刘梦得&#xff0c;诗豪&#xff09; 创作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 1、Ajax入门①简介②工作原理③优点④缺点⑤使用…

Poly Kernel Inception Network在遥感检测中的应用

摘要 https://export.arxiv.org/pdf/2403.06258 遥感图像&#xff08;RSI&#xff09;中的目标检测经常面临一些日益严重的挑战&#xff0c;包括目标尺度的巨大变化和多样的上下文环境。先前的方法试图通过扩大骨干网络的空间感受野来解决这些挑战&#xff0c;要么通过大核卷积…

易基因:人类大脑的单细胞DNA甲基化和3D基因组结构|Science

大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。 高通通量表观基因组分析技术可用于阐明大脑中细胞复杂性的基因调控程序。5-甲基胞嘧啶 (5mCs)是哺乳动物基因组中最常见的修饰碱基&#xff0c;大多数5mCs发生在胞嘧啶-鸟嘌呤二核苷酸&a…

Spring Boot 实现程序的优雅退出

&#x1f3f7;️个人主页&#xff1a;牵着猫散步的鼠鼠 &#x1f3f7;️系列专栏&#xff1a;Java全栈-专栏 &#x1f3f7;️个人学习笔记&#xff0c;若有缺误&#xff0c;欢迎评论区指正 目录 前言 优雅停机是什么 SpringBoot如何实现优雅停机 合理杀死进程 法一&…

企业开展开源安全治理必要性及可行性详细分析

背景 开源软件安全威胁是近几年企业安全面临的主要威胁&#xff0c;也是企业应用安全方向讨论的热门话题&#xff0c;但是由于是新的需求新的方向&#xff0c;很多企业在观望&#xff0c;当前开展这项工作是否已经成熟&#xff0c;项目成功率如何&#xff1f; 当新鲜事物产生时…

【iOS】——Blocks

文章目录 前言一、Blocks概要1.什么是Blocks 二、Block模式1.block语法2.block类型变量3.截获自动变量值4._Block修饰符5.截获的自动变量 三、Blocks的实现1.Block的实质2.截获自动变量值3._Block说明符4.Block存储域 前言 一、Blocks概要 1.什么是Blocks Blocks是C语言的扩…

聚类分析 | Matlab实现基于NNMF+DBO+K-Medoids的数据聚类可视化

聚类分析 | Matlab实现基于NNMFDBOK-Medoids的数据聚类可视化 目录 聚类分析 | Matlab实现基于NNMFDBOK-Medoids的数据聚类可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 NNMFDBOK-Medoids聚类&#xff0c;蜣螂优化算法DBO优化K-Medoids 非负矩阵分解&#xff08…

mac清除dns缓存指令 mac清除缓存怎么清理

你是否曾经被要求清理dns缓存并刷新&#xff1f;清理dns缓存一般是由于修改了主机文件&#xff0c;或者想排除一些网络上的故障。在Mac上清除dns缓存需要使用命令行来实现。在本文中&#xff0c;软妹子将向大家介绍mac清除dns缓存指令&#xff0c;并展示mac清除缓存怎么清理。 …

Stable Diffusion WebUI 生成参数:宽度/高度/生成批次/每批数量/提示词相关性/随机种子

本文收录于《AI绘画从入门到精通》专栏&#xff0c;专栏总目录&#xff1a;点这里。 大家好&#xff0c;我是水滴~~ 本文将继续了解 Stable Diffusion WebUI 的生成参数&#xff0c;主要内容有&#xff1a;宽度、高度、生成批次、每批数量、提示词相关性、随机种子。希望能对你…

路由器级联

目录 一、实现功能二、实现步骤2.1 接线步骤 三、效果3.1 常规连接3.2 路由器级联 一、实现功能 主路由器&#xff1a;可有WiFi功能&#xff0c;LAN口下接各设备&#xff0c;并接一个辅路由器辅路由器&#xff1a;开启WiFi功能&#xff0c;有线或无线下接各设备功能&#xff1…

Docker部署TeamCity来完成内部CI、CD流程

使用TeamCity来完成内部CI、CD流程 本篇教程主要讲解基于容器服务搭建TeamCity服务&#xff0c;并且完成内部项目的CI流程配置。至于完整的DevOps&#xff0c;我们后续独立探讨。 一个简单的CI、CD流程 以下分享一个简单的CI、CD流程&#xff08;仅供参考&#xff09;&#…

量子计算的飞跃!澳研究团队发明新型光量子处理器

内容来源&#xff1a;量子前哨&#xff08;ID&#xff1a;Qforepost&#xff09; 编辑丨慕一 编译/排版丨沛贤 深度好文&#xff1a;1800字丨12分钟阅读 澳大利亚研究人员成功研制出全球首款可重新编程的基于光的量子处理器&#xff0c;这一创新成果通过最小化光损失&#xf…

Linux操作系统的安全相关介绍

Linux操作系统的安全模型、访问控制、安全策略和加密机制是确保系统安全的重要组成部分。下面将详细介绍这些方面。 安全模型 Linux操作系统的安全模型基于传统的Unix安全模型&#xff0c;主要包括以下核心概念&#xff1a; 1. **用户和组**&#xff1a;Linux系统中的每…

数据结构——lesson10排序之插入排序

&#x1f49e;&#x1f49e; 前言 hello hello~ &#xff0c;这里是大耳朵土土垚~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#x…