Spring WebFlux之流式输出

news2025/3/20 17:38:10

🎉🎉🎉🎉🎉🎉
欢迎访问的个人博客:https://swzbk.site/,加好友,拉你入福利群
🎉🎉🎉🎉🎉🎉

流式输出(Streaming Output)是指将数据分块逐步发送给客户端,而不是一次性发送所有数据。这种方式特别适合处理大文件、实时数据或需要逐步展示的场景(如deepseek响应、语音、视频、日志等)。在springboot中通过Spring WebFlux实现。

1. Flux是什么?

  • 定义:Flux是一个异步数据流处理库,用于生成、操作和消费数据流(类似集合,但支持异步和非阻塞操作)。
  • 核心特点
    • 背压(Backpressure):消费者可以控制生产者的速度,避免数据过载。
    • 函数式编程:通过链式操作(如mapfilterflatMap)处理数据流。
    • 异步非阻塞:基于Reactor Netty实现高性能I/O,适合高并发场景。

2. Flux在后端Java中的作用

(1)处理异步与高并发
  • 场景:微服务通信、实时数据处理(如消息队列、日志监控)、长连接(WebSocket)。
  • 优势:通过异步非阻塞方式减少线程资源消耗,提升系统吞吐量。
(2)响应式Web开发
  • 与Spring WebFlux结合:构建响应式REST API,支持HTTP/2和Server-Sent Events(SSE)。
  • 示例代码
    @GetMapping("/events")
    public Flux<Event> getEvents() {
        return Flux.interval(Duration.ofSeconds(1))
                    .map(sequence -> new Event("Event " + sequence));
    }
    
    (每秒推送一个事件给客户端)
(3)背压管理
  • 避免内存溢出:当消费者处理速度慢于生产者时,Flux通过背压机制暂停生产,确保系统稳定。
(4)数据流操作
  • 丰富的操作符:支持过滤、转换、合并、重试等复杂逻辑。
    Flux.just(1, 2, 3)
        .map(n -> n * 2)
        .filter(n -> n % 3 == 0)
        .subscribe(System.out::println); // 输出:6
    

3. 为什么选择Flux?

  • 与Spring生态集成:无缝衔接Spring Boot、Spring Cloud,适合企业级应用。
  • 轻量高效:相比传统阻塞IO,资源占用更少,适合云原生环境。
  • 对比RxJava:Flux是Spring官方推荐的响应式库,更注重与Java生态的兼容性。

4. 案例实现

在 Java 中使用 OkHttp3 发送请求,并通过 Project Reactor 的 Flux 获取实时响应,通常适用于处理流式数据,比如服务器发送的实时更新或者大型数据块的逐步传输。以下为你详细介绍实现步骤并给出示例代码。

实现思路
  1. 引入依赖:需要在项目中引入 OkHttp3 和 Project Reactor 的依赖。
  2. 发送请求:使用 OkHttp3 发送 HTTP 请求。
  3. 处理响应:将 OkHttp3 的响应流转换为 Flux 进行响应式处理。
4.1 添加依赖

如果你使用的是 Maven 项目,在 pom.xml 中添加以下依赖:

<dependencies>
    <!-- OkHttp3 -->
    <dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
        <version>4.9.3</version>
    </dependency>
     <!--1: webflux 会默认引入下面的单独reactor-core-->
    <dependency>
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
    <!-- 2:Project Reactor Core、单独引入 -->
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.4.16</version>
    </dependency>
</dependencies>
4.2 编写代码

你提供的代码片段是结合了 OkHttp3 发起异步请求和 Project Reactor 的 Flux 来处理响应结果。下面为你完善这个示例,并详细解释代码逻辑。

示例代码
import okhttp3.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.io.IOException;

public class OkHttpFluxExample {
    private static final OkHttpClient client = new OkHttpClient();

    public static Flux<String> makeApiCall(Request apiRequest) {
        return Flux.create(emitter -> {
            try {
                // 发起异步请求
                client.newCall(apiRequest).enqueue(new Callback() {
                    @Override
                    public void onFailure(Call call, IOException e) {
                        // 处理请求失败的情况
                        System.err.println("请求 API 失败: " + e.getMessage());
                        emitter.error(e);
                    }

                    @Override
                    public void onResponse(Call call, Response response) throws IOException {
                        try (ResponseBody responseBody = response.body()) {
                            if (response.isSuccessful() && responseBody != null) {
                                // 假设响应是逐行文本数据,逐行发布到 Flux 中
                                String[] lines = responseBody.string().split("\n");
                                for (String line : lines) {
                                    emitter.next(line);
                                }
                                // 数据发布完成,发送完成信号
                                emitter.complete();
                            } else {
                                // 处理响应不成功的情况
                                String errorMessage = "请求 API 失败,响应码: " + response.code();
                                System.err.println(errorMessage);
                                emitter.error(new IOException(errorMessage));
                            }
                        }
                    }
                });
            } catch (Exception e) {
                // 处理请求过程中发生的异常
                System.err.println("请求 API 时发生异常: " + e.getMessage());
                emitter.error(e);
            }
        });
    }

    public static void main(String[] args) {
        // 构建请求
        Request apiRequest = new Request.Builder()
               .url("https://example.com/api") // 替换为实际的 API 地址
               .build();

        // 调用 makeApiCall 方法获取 Flux
        Flux<String> responseFlux = makeApiCall(apiRequest);

        // 订阅 Flux 并处理响应数据
        responseFlux.subscribe(
                // 处理每个接收到的元素
                line -> System.out.println("Received: " + line),
                // 处理错误
                error -> System.err.println("Error: " + error.getMessage()),
                // 处理完成信号
                () -> System.out.println("API 请求处理完成")
        );
    }
}
代码解释
  1. makeApiCall 方法
    • 该方法接收一个 Request 对象作为参数,返回一个 Flux<String> 对象。
    • 使用 Flux.create 创建 Flux,在其回调中使用 OkHttp3 的 enqueue 方法发起异步请求。
    • onFailure 方法:当请求失败时,打印错误信息并调用 emitter.error 方法将错误信号发送给 Flux 的订阅者。
    • onResponse 方法
      • 若响应成功且响应体不为空,将响应体按行分割,逐行调用 emitter.next 方法将每行数据发布到 Flux 中。
      • 数据发布完成后,调用 emitter.complete 方法发送完成信号。
      • 若响应不成功,打印错误信息并调用 emitter.error 方法发送错误信号。
  2. main 方法
    • 构建一个 Request 对象,指定要请求的 API 地址。
    • 调用 makeApiCall 方法获取 Flux
    • 使用 subscribe 方法订阅 Flux,处理接收到的元素、错误和完成信号。
注意事项
  • 请将 https://example.com/api 替换为实际的 API 地址。
  • 该示例假设响应是逐行文本数据,你可以根据实际情况调整数据处理逻辑。
  • 在实际应用中,需要注意资源管理,例如关闭 OkHttp 客户端等。

🎉🎉🎉🎉🎉🎉
欢迎访问的个人博客:https://swzbk.site/,加好友,拉你入福利群
🎉🎉🎉🎉🎉🎉

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2318503.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于springboot医疗平台系统(源码+lw+部署文档+讲解),源码可白嫖!

摘要 信息化时代&#xff0c;各行各业都以网络为基础飞速发展&#xff0c;而医疗服务行业的发展却进展缓慢&#xff0c;传统的医疗服务行业已经逐渐不满足民众的需求&#xff0c;有些还在以线下预约挂号的方式接待病人&#xff0c;为此设计一个医疗平台系统很有必要。此类系统…

Stable Diffusion lora训练(一)

一、不同维度的LoRA训练步数建议 2D风格训练 数据规模&#xff1a;建议20-50张高质量图片&#xff08;分辨率≥10241024&#xff09;&#xff0c;覆盖多角度、多表情的平面风格。步数范围&#xff1a;总步数控制在1000-2000步&#xff0c;公式为 总步数 Repeat Image Epoch …

网络空间安全(37)获取webshell方法总结

一、直接上传获取Webshell 这是最常见且直接的方法&#xff0c;利用网站对上传文件的过滤不严或存在漏洞&#xff0c;直接上传Webshell文件。 常见场景&#xff1a; 许多PHP和JSP程序存在此类漏洞。例如&#xff0c;一些论坛系统允许用户上传头像或心情图标&#xff0c;攻击者可…

第十三次CCF-CSP认证(含C++源码)

第十三次CCF-CSP认证 跳一跳满分题解 碰撞的小球满分题解遇到的问题 棋局评估满分题解 跳一跳 题目链接 满分题解 没什么好说的 基本思路就是如何用代码翻译题目所给的一些限制&#xff0c;以及变量应该如何更新&#xff0c;没像往常一样给一个n&#xff0c;怎么读入数据&…

swagger ui 界面清除登录信息的办法

我们在开发过程中&#xff0c;用swagger ui 测试接口的时候&#xff0c;可能会要修改当前登录的用户。 但是如果我们在谷歌中对调试的本地swagger ui 登录地址存储过账户密码&#xff0c;每次启动项目调试之后&#xff0c;都会自动登录swagger ui &#xff0c;登录界面一闪就…

TensorFlow 的基本概念和使用场景

TensorFlow 是一个由 Google 开发的开源机器学习框架&#xff0c;主要用于构建和训练深度学习模型。下面是一些 TensorFlow 的基本概念和使用场景&#xff1a; 基本概念&#xff1a; 张量&#xff08;Tensor&#xff09;&#xff1a;在 TensorFlow 中&#xff0c;数据以张量的…

基于x11vnc的ubuntu远程桌面

1、安装VNC服务 sudo apt install x11vnc -y2、创建连接密码 sudo x11vnc -storepasswd3、安装lightdm服务 x11vnc 在 默认的 GDM3 中不起作用&#xff0c;因此需要使用 lightdm 桌面管理环境 sudo apt install lightdm -y切换至lightdm&#xff0c;上一步已经切换则跳过该…

Cursor解锁Claude Max,助力AI编程新突破!

Cursor 最新推出的 Claude Max 模型&#xff0c;以其卓越的性能和创新的能力&#xff0c;正在重新定义我们对 AI 辅助编程的认知。这款搭载 Claude3.7 大脑的超级模型&#xff0c;不仅具备超强智能&#xff0c;还凭借一系列技术突破&#xff0c;向传统 AI 编程工具发起了挑战。…

ESP8266 与 ARM7 接口-LPC2148 创建 Web 服务器以控制 LED

ESP8266 与 ARM7 接口-LPC2148 创建 Web 服务器以控制 LED ESP8266 Wi-Fi 收发器提供了一种将微控制器连接到网络的方法。它被广泛用于物联网项目,因为它便宜、体积小且易于使用。 在本教程中,我们将 ESP8266 Wi-Fi 模块与 ARM7-LPC2148 微控制器连接,并创建一个 Web 服务…

通过C#脚本更改材质球的参数

// 设置贴图Texture mTexture Resources.Load("myTexture", typeof(Texture )) as Texture;material.SetTexture("_MainTex", mTexture );// 设置整数material.SetInt("_Int", 1);// 设置浮点material.SetFloat("_Float", 0.1f);// 设…

FPGA管脚约束

目录 前言 一、IO约束 二、延迟约束 前言 IO约束包括管脚约束和延迟约束。 一、IO约束 对管脚进行约束&#xff0c;对应的约束语句&#xff1a; set_property -dict {PACKAGE_PIN AJ16 IOSTANDARD LVCMOS18} [get_ports "led[0]" ] 上面是单端的管脚&…

实现前端.ttf字体包的压缩

前言 平常字体包都有1M的大小&#xff0c;所以网络请求耗时会比较长&#xff0c;所以对字体包的压缩也是前端优化的一个点。但是前端如果想要特点字符打包成字体包&#xff0c;网上查阅资料后&#xff0c;都是把前端代码里面的字符获取&#xff0c;但是对于动态的内容&#xf…

uni-app集成保利威直播、点播SDK经验FQ(二)|小程序直播/APP直播开发适用

通过uniapp集成保利威直播、点播SDK来开发小程序/APP的视频直播能力&#xff0c;在实际开发中可能会遇到的疑问和解决方案&#xff0c;下篇。更多疑问请咨询19924784795。 1.ios不能后台挂起uniapp插件 ios端使用后台音频播放和画中画功能&#xff0c;没有在 manifest.json 进…

Sensodrive机器人力控关节模组SensoJoint在海洋垃圾清理机器人中的拓展应用

海洋污染已成为全球性的环境挑战&#xff0c;其中海底垃圾的清理尤为困难。据研究&#xff0c;海洋中约有2600万至6600万吨垃圾&#xff0c;超过90%沉积在海底。传统上&#xff0c;潜水员收集海底垃圾不仅成本高昂&#xff0c;而且充满风险。为解决这一问题&#xff0c;欧盟资助…

Git的基本指令

一、回滚 1.git init 在项目文件夹中打开bash生成一个.git的子目录&#xff0c;产生一个仓库 2.git status 查看当前目录下的所有文件的状态 3.git add . 将该目录下的所有文件提交到暂存区 4.git add 文件名 将该目录下的指定文件提交到暂存区 5.git commit -m 备注信…

Vitis 2024.1 无法正常编译custom ip的bug(因为Makefile里的wildcard)

现象&#xff1a;如果在vivado中&#xff0c;添加了自己的custom IP&#xff0c;比如AXI4 IP&#xff0c;那么在Vitis&#xff08;2024.1&#xff09;编译导出的原本的.xsa的时候&#xff0c;会构建build失败。报错代码是&#xff1a; "Compiling blank_test_ip..."…

Elasticsearch 在航空行业:数据管理的游戏规则改变者

作者&#xff1a;来自 Elastic Adam La Roche 数字化客户体验不再是奢侈品&#xff0c;而是欧洲航空公司必不可少的需求。它推动了客户满意度&#xff0c;提升了运营效率&#xff0c;并创造了可持续的竞争优势。随着行业的不断发展&#xff0c;优先投资前沿数字技术和平台的航空…

DeepSeek 模型的成本效益深度解析:低成本、高性能的AI新选择

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…

利用knn算法实现手写数字分类

利用knn算法实现手写数字分类 1.作者介绍2.KNN算法2.1KNN&#xff08;K-Nearest Neighbors&#xff09;算法核心思想2.2KNN算法的工作流程2.3优缺点2.4 KNN算法图示介绍 3.实验过程3.1安装所需库3.2 MNIST数据集3.3 导入手写数字图像进行分类3.4 完整代码3.5 实验结果 1.作者介…

基于springboot+vue的调查问卷平台

一、系统架构 前端&#xff1a;vue | element-ui | echarts 后端&#xff1a;springboot | mybatis-plus 环境&#xff1a;jdk1.8 | mysql | maven 二、代码及数据 三、功能介绍 01. 注册 02. 登录 03. web端-问卷中心 04. web端-文章中心 05. 管理端-…