重学SpringBoot3-Spring WebFlux之Reactor事件感知 API

news2025/1/17 8:46:33

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

在这里插入图片描述

Spring WebFlux之Reactor事件感知 API

  • 1. 什么是 `doOnXxx` 系列 API?
  • 2. `doOnXxx` API 的常用方法
    • 2.1 `doOnNext()`
      • 示例:
      • 输出:
    • 2.2 `doOnError()`
      • 示例:
      • 输出:
    • 2.3 `doOnComplete()`
      • 示例:
      • 输出:
    • 2.4 `doOnSubscribe()`
      • 示例:
      • 输出:
    • 2.5 `doOnCancel()`
      • 示例:
      • 输出:
    • 2.6 `doFinally()`
      • 示例:
      • 输出:
    • 2.7 `doOnTerminate()`
      • 示例:
      • 输出:
    • 2.8 `doOnEach()`
      • 示例:
      • 输出:
    • 2.9 `doOnDiscard()`
      • 示例:
      • 输出:
    • 2.10 `doOnRequest()`
      • 示例:
      • 输出:
  • 3. `doOnXxx` 的应用场景
  • 4. 总结

在 Spring Boot 3 中,响应式编程通过 Reactor 库得到了广泛应用,提供了强大的流式数据处理能力。为了增强对流式数据流的调试和处理能力,Reactor 提供了一组非常重要的事件感知(side-effect)API,也就是我们常听到的 doOnXxx 系列方法。

这篇博客将详细介绍 doOnXxx 系列 API 的功能和用法,帮助大家更好地理解它们在响应式流中的作用,并展示其在实际开发中的一些应用场景。

1. 什么是 doOnXxx 系列 API?

doOnXxx 系列方法是 Reactor 提供的一组用于在流操作过程中执行副作用的 API。它们不会改变流的内容或数据流本身,而是允许我们在特定的生命周期事件发生时进行操作(如日志记录、调试、监控等)。

doOnXxx

这些 API 名称中的 Xxx 代表不同的事件类型,比如:

  • doOnNext(): 当下一个元素被发出时执行操作。
  • doOnError(): 当流中出现错误时执行操作。
  • doOnComplete(): 当流完成时执行操作。
  • doOnSubscribe(): 当订阅发生时执行操作。

这些方法非常适合用于监控、调试或者记录流的行为。

2. doOnXxx API 的常用方法

下面我们依次介绍常见的 doOnXxx API,并通过简单的示例进行演示。

2.1 doOnNext()

doOnNext() 方法允许你在每个元素被发布时执行操作,通常用于对每个数据元素进行日志记录、调试或者进行某种副作用操作。

示例:

        Flux<String> flux = Flux.just("Spring", "Boot", "3","Reactor")
                .doOnNext(value -> System.out.println("Processing value: " + value))
                .map(String::toUpperCase);

        flux.subscribe(System.out::println);

输出:

doOnNext()

在这个例子中,doOnNext() 被用于每个元素发出时打印日志。这对于调试非常有用,可以清楚看到每个数据元素何时被处理。

2.2 doOnError()

doOnError() 方法允许你在流中出现异常时执行操作,通常用于记录异常信息、执行错误处理逻辑等。

示例:

        Flux<Integer> fluxWithError = Flux.just(1, 2, 0)
                .map(i -> 10 / i)  // 这里会抛出 ArithmeticException: / by zero
                .doOnError(e -> System.err.println("Error occurred: " + e.getMessage()));

        fluxWithError.subscribe(
                System.out::println,
                error -> System.err.println("Subscriber received error: " + error)
        );

输出:

doOnError()

在这个例子中,Flux 被用来创建一个数据流,并且在这个数据流中执行了一些操作,包括可能抛出异常的操作。下面是对消费者和生产者异常捕获的区别:
生产者异常捕获:

  • 在生产者端,可以使用 doOnError 方法来捕获并处理异常,这个方法会在数据流中发生错误时被调用。
  • doOnError 可以用于记录日志或执行一些清理操作,它不会改变数据流的行为,但数据流会被终止。

消费者异常捕获:

  • 在消费者端,可以通过 subscribe 方法的第二个参数(错误处理回调)来捕获并处理异常。
  • 这个错误处理回调会在数据流中发生错误时被调用,可以用于记录日志或执行其他错误处理逻辑。

2.3 doOnComplete()

doOnComplete() 方法在流完成时(即没有更多元素发出)执行操作。你可以利用它在流结束时执行一些收尾工作,比如关闭资源、统计处理结果等。

示例:

        Flux<String> flux = Flux.just("Spring", "Boot", "3","Reactor")
                .doOnComplete(() -> System.out.println("Stream completed"));

        flux.subscribe(System.out::println);

输出:

doOnComplete()

这里,doOnComplete() 用于在数据流结束时打印一条日志,通知处理完成。

2.4 doOnSubscribe()

doOnSubscribe() 允许你在流被订阅时执行操作。它通常用于监控订阅事件,适合用于统计订阅数或进行相关的初始化操作。

示例:

        Flux<String> flux = Flux.just("A", "B", "C")
                .doOnSubscribe(subscription -> System.out.println("Subscription started"));

        flux.subscribe(System.out::println);

输出:

doOnSubscribe()

在这个例子中,当流被订阅时,doOnSubscribe() 被调用,打印订阅开始的日志。

2.5 doOnCancel()

doOnCancel() 方法在取消订阅时执行操作。取消订阅通常是在消费者不再需要流数据时发生的(例如手动取消订阅或者发生超时等情况),可以用于处理一些资源释放的操作。

示例:

Flux<String> flux = Flux.just("A", "B", "C")
    .doOnCancel(() -> System.out.println("Subscription canceled"))
    .take(2);  // 只取前两个元素,第三个元素将被跳过(取消)

flux.subscribe(System.out::println);

输出:

doOnCancel()

这里 doOnCancel() 在流被取消时执行了取消订阅的操作。

2.6 doFinally()

doFinally() 是一个非常有用的方法,它在流结束时始终会被调用(无论是正常完成、错误还是取消订阅)。它类似于 try-finally 语句中的 finally,适合做一些无论流如何结束都需要执行的操作,如清理资源等。

示例:

        Flux<String> flux = Flux.just("A", "B", "C")
                .doFinally(signalType -> System.out.println("Stream ended with signal: " + signalType));

        flux.subscribe(System.out::println);

输出:

doFinally()

doFinally() 可以捕捉到不同类型的信号,包括 onComplete, onErroronCancel

2.7 doOnTerminate()

doOnTerminate() 在流完成或出错时执行操作。它是 doOnComplete()doOnError() 的组合,但不区分流是正常完成还是出现错误,只要流结束了,它就会被调用。

示例:

        Flux<String> flux = Flux.just("A", "B", "C")
                .doOnTerminate(() -> System.out.println("Stream terminated"));

        flux.subscribe(System.out::println);

输出:

doOnTerminate()

它在流结束时总会执行,不管是否出现错误。

2.8 doOnEach()

doOnEach() 是一个非常通用的事件感知 API,它允许对流中的每一个信号(包括 onNextonErroronCompleteonSubscribe)进行统一处理。这个方法会接收一个 Signal 对象,表示当前发生的事件类型,从而可以处理不同的信号类型。

示例:

        Flux<String> flux = Flux.just("Spring", "Boot", "3", "Reactor")
                .doOnEach(signal -> {
                    if (signal.isOnNext()) {
                        System.out.println("Element received: " + signal.get());
                    } else if (signal.isOnError()) {
                        System.err.println("Error occurred: " + signal.getThrowable().getMessage());
                    } else if (signal.isOnComplete()) {
                        System.out.println("Stream completed");
                    }
                });

        flux.subscribe(System.out::println);

输出:

doOnEach()

2.9 doOnDiscard()

doOnDiscard() 方法用于处理被 丢弃的元素。当某些元素由于某种原因(例如 filter() 操作或上游取消)没有被使用时,可以通过 doOnDiscard() 来感知这些元素的丢弃,并执行相关的操作(如清理资源、记录日志等)。

可能使用 doOnDiscard 钩子的例子包括以下情况:

  • filter: 不符合过滤器的项被视为 “丢弃”。
  • skip:跳过的项将被丢弃。
  • buffer(maxSize, skip)maxSize < skip:“丢弃的缓冲区” — 缓冲区之间的元素被丢弃。

示例:

        Flux<String> flux = Flux.just("AA", "BB", "C", "D", "E")
                .filter(s -> s.length() > 1)
                .doOnDiscard(String.class, discardedValue ->
                        System.out.println("Discarded: " + discardedValue));

        flux.subscribe(System.out::println);

输出:

doOnDiscard()

2.10 doOnRequest()

doOnRequest() 是一个用于处理 背压请求(request signals) 的 API,它允许你在下游请求元素时执行操作。响应式流中上游发送元素的数量通常由下游通过请求背压机制控制,因此 doOnRequest() 可以帮助我们监控下游对元素的需求。

示例:

        Flux<Integer> flux = Flux.range(1, 5)
                .doOnRequest(request ->
                        System.out.println("Request for: " + request + " elements"));

        flux.subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("Received: " + integer);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });  // 请求 3 个元素

输出:

doOnRequest()

3. doOnXxx 的应用场景

  1. 日志记录与调试:在流的不同阶段插入 doOnXxx,帮助我们记录每个阶段的状态变化或异常情况,从而更好地调试响应式流。
  2. 监控和统计:我们可以使用 doOnSubscribe()doOnComplete() 结合监控系统来统计订阅的数量、完成的流数量,分析流的性能。
  3. 资源管理:使用 doFinally() 进行资源释放和清理,确保无论流如何结束都能进行相应的收尾工作。
  4. 错误处理:使用 doOnError() 可以在发生错误时记录日志、发送通知或者做出其他相应的处理。

4. 总结

Reactor 的 doOnXxx 系列 API 是在响应式流中进行事件感知和副作用处理的强大工具。它们的主要作用是让开发者能够在不干扰流式数据处理的情况下,插入额外的操作,如调试、监控、资源清理等。通过合理使用 doOnNext()doOnError()doFinally() 等方法,我们可以更好地理解和控制响应式流的执行过程,从而构建更加健壮和高效的应用程序。

希望这篇文章能帮助你更好地掌握 doOnXxx 系列方法。如果你有任何问题或建议,欢迎讨论!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2225091.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OCR经典神经网络(三)LayoutLM v2算法原理及其在发票数据集上的应用(NER及RE)

OCR经典神经网络(三)LayoutLM v2算法原理及其在发票数据集上的应用(NER及RE) LayoutLM系列模型是微软发布的、文档理解多模态基础模型领域最重要和有代表性的工作&#xff1a; LayoutLM v2&#xff1a;在一个单一的多模态框架中对文本&#xff08;text&#xff09;、布局&…

OpenAI GPT-o1实现方案记录与梳理

本篇文章用于记录从各处收集到的o1复现方案的推测以及介绍 目录 Journey Learning - 上海交通大学NYUMBZUAIGAIRCore IdeaKey QuestionsKey TechnologiesTrainingInference A Tutorial on LLM Reasoning: Relevant methods behind ChatGPT o1 - UCL汪军教授Core Idea先导自回归…

anaconda 创建环境失败 解决指南

anaconda 创建环境失败 解决指南 一、问题描述 我在宿舍有一台电脑。由于我经常泡在实验室&#xff0c;所以那台电脑不是经常用&#xff0c;基本吃灰。昨天晚上突然有在那台电脑上使用Camel-AI部署多智能体协同需求&#xff0c;便戳开了电脑&#xff0c;问题也随之而来。 当…

开源实时数仓的构建

设计计思路 基本思路 开源数据平台的设计思路是通过 Flink SQL Batch、StartRocks SQL 、StartRocks物化视图 的能力实现一个离线任务的开发&#xff1b;使用 DolphinScheduler 进行离线工作流编排和调度&#xff1b;通过 Flink CDC 和 Flink SQL 实现流处理能力&#xff0c;进…

【自然语言处理】BERT模型

BERT&#xff1a;Bidirectional Encoder Representations from Transformers BERT 是 Google 于 2018 年提出的 自然语言处理&#xff08;NLP&#xff09;模型&#xff0c;它基于 Transformer 架构的 Encoder 部分。BERT 的出现极大提升了 NLP 任务的性能&#xff0c;如问答系…

Linux基础知识 - C(自学使用)

1.C语言基础知识 参考博客&#xff1a; https://blog.csdn.net/qq_45254369/article/details/126023482?ops_request_misc%257B%2522request%255Fid%2522%253A%252277629891-A0F3-4EFC-B1AC-410093596085%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%…

【Canvas与图标】六色彩虹圆角六边形图标

【成图】 120*120的png图标 以下是各种大小图&#xff1a; 【代码】 <!DOCTYPE html> <html lang"utf-8"> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/> <head><title>六色彩虹圆角六边形…

Android简单控件实现简易计算器

学了一些Android的简单控件&#xff0c;用这些布局和控件&#xff0c;设计并实现一个简单计算器。 计算器的界面分为两大部分&#xff0c;第一部分是上方的计算表达式&#xff0c;既包括用户的按键输入&#xff0c;也包括计算结果 数字&#xff1b;第二部分是下方的各个按键&a…

1U服务器和Hyper-V虚拟机使用记录

记录最近接触服务器和虚拟机的一些使用操作知识 背景&#xff1a;1U服务器上架使用&#xff0c;备份其他服务器vm虚拟机&#xff0c;Hyper-V管理虚拟机使用测试 设备&#xff1a;IBM3550服务器交换机&#xff0c; 移动硬盘&#xff1a;附加存储盘&#xff0c; u盘1&#xff1…

在虚拟化环境中,虚拟机的资源分配是否真的能够完全等效于物理服务器?是否有某些特定的工作负载在虚拟化环境中始终无法达到理想表现?

目录 1. 虚拟化技术的基本原理与资源管理 2. 资源分配的等效性问题 3. 特定工作负载在虚拟化环境中的表现 4. 性能优化与虚拟化环境的选择 5. 结论 虚拟化技术的广泛应用为数据中心的资源管理与部署带来了革命性的变化。虚拟机&#xff08;VM&#xff09;通过抽象化的方式…

【了解一下静态代理与动态代理】

文章目录 一.什么是静态代理与动态代理二.静态代理三.动态代理1.jdk动态代理2.cglib动态代理 四.小结 一.什么是静态代理与动态代理 什么是代理&#xff1f;代理是一种设计模式&#xff0c;在这种模式中&#xff0c;一个类&#xff08;代理类&#xff09;代表另一个类&#xff…

【ArcGIS Pro实操第8期】绘制WRF三层嵌套区域

【ArcGIS Pro实操第8期】绘制WRF三层嵌套区域 数据准备ArcGIS Pro绘制WRF三层嵌套区域Map-绘制三层嵌套区域更改ArcMap地图的默认显示方向指定数据框范围 Map绘制研究区Layout-布局出图 参考 本博客基于ArcGIS Pro绘制WRF三层嵌套区域&#xff0c;具体实现图形参考下图&#xf…

cloak斗篷伪装下的独立站

随着互联网的不断进步&#xff0c;越来越多的跨境电商卖家开始认识到独立站的重要性&#xff0c;并纷纷建立自己的独立站点。对于那些有志于进入这一领域的卖家来说&#xff0c;独立站是什么呢&#xff1f;独立站是指个人或小型团队自行搭建和运营的网站。 独立站能够帮助跨境…

C++ 模板编程:解锁高效编程的神秘密码

快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 目录 &#x1f4af;前言 &#x1f4af;泛型编程 &#x1f4af;函数模板 1.函数模板概念 2.函数模板格式 3.函数模板的原理 4.函数模板的实例化 5.模板参…

如何使用DBeaver连接flink

通过DBeaver配置并连接flink&#xff1a; 1. 打开Dbeaver&#xff0c;选择“数据库”--》“数据库驱动管理器” 2.在驱动管理器界面点击新建按钮 3.在新建弹窗输入flink相关的驱动信息&#xff0c;主要包括&#xff1a; org.apache.flink.table.jdbc.FlinkDriver jdbc:flink…

Spring+ActiveMQ

1. 环境搭建 1.1 env-version JDK 1.8 Spring 2.7.13 Maven 3.6 ActiveMQ 5.15.2 1.2 docker-compose.yml version: 3.8services:activemq:image: rmohr/activemq:5.16.3container_name: activemqports:- "61616:61616"- "8161:8161"environment…

图解:什么是多租户?

大家好&#xff0c;我是汤师爷~ 什么是多租户&#xff1f; 多租户是SaaS&#xff08;软件即服务&#xff09;领域里特有的一个概念。在SaaS服务中&#xff0c;“租户”指的就是使用这个SaaS系统的客户。 那么租户和用户有什么区别呢&#xff1f;举个例子。假设你正在使用一款…

SQL实战训练之,力扣:1532最近的三笔订单

目录 一、力扣原题链接 二、题目描述 三、建表语句 四、题目分析 五、SQL解答 六、最终答案 七、验证 八、知识点 一、力扣原题链接 1532. 最近的三笔订单 二、题目描述 客户表&#xff1a;Customers ------------------------ | Column Name | Type | --------…

【C++单调栈 贡献法】907. 子数组的最小值之和|1975

本文涉及的基础知识点 C单调栈 LeetCode907. 子数组的最小值之和 给定一个整数数组 arr&#xff0c;找到 min(b) 的总和&#xff0c;其中 b 的范围为 arr 的每个&#xff08;连续&#xff09;子数组。 由于答案可能很大&#xff0c;因此 返回答案模 109 7 。 示例 1&#x…

ArcGIS计算多个面要素范围内栅格数据各数值的面积

本文介绍在ArcMap软件中&#xff0c;基于面积制表工具&#xff08;也就是Tabulate Area工具&#xff09;&#xff0c;基于1个面要素数据集与1个栅格数据&#xff0c;计算每一个面要素中各栅格数据分布面积的方法。 首先&#xff0c;来看一下本文的需求。现有一个矢量面的要素集…