Spring WebFlux 中 WebSocket 使用 DataBuffer 的注意事项

news2025/3/6 11:33:06

以下是修改后的完整文档,包含在多个多线程环境中使用 retain()release() 方法的示例,且确保在 finally 块中调用 release()


在 Spring WebFlux 中,WebSocketMessage 主要用于表示 WebSocket 的消息载体,其中 getPayload() 方法返回 DataBuffer,用于处理二进制数据流。在使用 DataBuffer 时,需要注意其一次性读取特性,以及潜在的内存管理问题。本文将介绍如何正确使用 DataBuffer,避免重复读取和内存泄漏。

1. 避免重复读取 DataBuffer

DataBuffer 设计为一次性读取流数据,因此,一旦被消费,后续读取将无法获取数据。例如:

String firstRead = webSocketMessage.getPayload().toString(StandardCharsets.UTF_8);
String secondRead = webSocketMessage.getPayload().toString(StandardCharsets.UTF_8); // 此处读取会失败

解决方案

如果需要多次使用 DataBuffer 的数据,可以在第一次读取时缓存:

DataBuffer dataBuffer = webSocketMessage.getPayload();
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
String payload = new String(bytes, StandardCharsets.UTF_8);

这样,后续可以安全地使用 payload 变量,而不会影响 DataBuffer


2. 避免阻塞操作

Spring WebFlux 是基于响应式编程的,WebSocket 处理也应保持非阻塞。如果在 DataBuffer 处理中引入了阻塞操作(如同步 I/O 或 Thread.sleep()),可能会导致 Reactor 线程阻塞,影响整体吞吐量。

解决方案

使用 Flux/Mono 进行异步处理,例如:

session.receive()
    .map(WebSocketMessage::getPayloadAsText)  // 避免直接操作 DataBuffer
    .flatMap(payload -> processMessage(payload))
    .subscribe();

3. 处理 DataBuffer 可能带来的内存泄漏

Spring WebFlux 采用 Netty 作为默认底层引擎,而 Netty 的 ByteBuf 需要手动释放,否则可能导致内存泄漏。Spring 提供了 DataBufferUtils.release() 方法来避免 DataBuffer 占用资源不被回收。

正确的释放方式

session.receive()
    .doOnNext(message -> {
        try {
            String data = message.getPayloadAsText();
            System.out.println("Received: " + data);
        } finally {
            DataBufferUtils.release(message.getPayload());
        }
    })
    .subscribe();

DataBufferUtils.release() 仅在手动管理 DataBuffer 生命周期时才需要,如果直接通过 WebSocketMessage.getPayloadAsText() 处理字符串,不必显式释放。


4. 在 Flux/Mono 组合操作时避免数据丢失

如果 DataBuffermap() 操作多次消费,可能导致数据丢失或 DataBuffer 为空。例如:

session.receive()
    .map(message -> {
        DataBuffer payload = message.getPayload();
        DataBufferUtils.release(payload); // 这里释放后,后续的 map() 操作会读取不到数据
        return payload;
    })
    .map(buffer -> buffer.toString(StandardCharsets.UTF_8)) // 这里可能会失败
    .subscribe();

正确的方式

  • 确保 DataBuffer 只在最终消费时释放。
  • 处理 DataBuffer 时,转换为 byte[] 以避免流式数据的重复读取。
session.receive()
    .map(WebSocketMessage::getPayload)
    .map(dataBuffer -> {
        byte[] bytes = new byte[dataBuffer.readableByteCount()];
        dataBuffer.read(bytes);
        DataBufferUtils.release(dataBuffer);  // 读取完毕后释放
        return new String(bytes, StandardCharsets.UTF_8);
    })
    .subscribe(System.out::println);

5. retain()release() 方法的补充

Spring WebFlux 中,WebSocketMessage 还提供了 retain()release() 方法,用于管理 DataBuffer 的引用计数和释放资源。下面介绍如何在多线程环境中正确使用这些方法。

retain() 方法

retain() 方法确保 DataBuffer 的引用计数增加,以便在需要时能够安全使用:

public WebSocketMessage retain() {
    if (reactorNetty2Present) {
        return ReactorNetty2Helper.retain(this);
    }
    DataBufferUtils.retain(this.payload);
    return this;
}

retain() 方法会增加 DataBuffer 的引用计数,防止在处理过程中被提前释放。这对于需要多个组件共享同一 DataBuffer 实例的情况非常重要。

release() 方法

release() 方法用于释放 DataBuffer,减少引用计数,释放底层资源,防止内存泄漏:

public void release() {
    DataBufferUtils.release(this.payload);
}

release() 方法通常在处理完成后调用,确保底层的 DataBuffer 被正确释放。

使用示例:在多线程环境中使用 retain() 和 release()

在 WebSocket 消息处理时,确保在多线程环境中正确管理 DataBuffer 的生命周期。示例如下,使用 retain() 保证资源被正确引用,并在 finally 块中调用 release() 确保即使出现异常时也会释放资源:

session.receive()
    .doOnNext(message -> {
        // 在多线程环境中保留引用
        message.retain();
        try {
            String data = message.getPayloadAsText();
            System.out.println("Received: " + data);
            
            // 模拟处理过程,可能会涉及多线程操作
            // 例如:通过某个线程池处理消息
            processMessageAsync(data);

        } finally {
            // 确保释放资源
            message.release();  // 释放资源
        }
    })
    .subscribe();

在上面的示例中,retain() 确保了 DataBuffer 在多个线程中可以安全访问,直到最终的 release() 被调用来释放资源。无论操作成功与否,finally 块中的 release() 都会被执行,确保不会发生内存泄漏。


6. 总结

在 Spring WebFlux 中使用 WebSocketMessageDataBuffer 需要注意以下几点:

  1. 避免重复读取 DataBuffer,建议在读取后缓存数据。
  2. 避免阻塞操作,尽量使用 Flux/Mono 进行异步处理。
  3. 防止内存泄漏,在手动管理 DataBuffer 生命周期时使用 DataBufferUtils.release() 释放资源。
  4. 确保 DataBuffer 只在最终消费时释放,避免 Flux 流程中数据丢失。
  5. 使用 retain()release() 方法 来管理 DataBuffer 的引用计数,确保资源的正确释放,特别是在多线程环境中,确保在 finally 中释放资源。

通过遵循这些实践,可以有效地管理 WebSocket 消息的内存使用,并提高应用的性能和可靠性。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2310513.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android ChatOn-v1.66.536-598-[构建于ChatGPT和GPT-4o之上]

ChatOn 链接:https://pan.xunlei.com/s/VOKYnq-i3C83CK-HJ1gfLf4gA1?pwdwzwc# 添加了最大无限积分 删除了所有调试信息 语言:全语言支持

游戏树搜索与优化策略:Alpha-Beta剪枝及其实例分析

1.Alpha-Beta搜索 Alpha-Beta 搜索是一种用于对抗性游戏(比如象棋、围棋)的智能算法,目的是帮助计算机快速找到“最优走法”,同时避免不必要的计算。它的核心思想是:通过剪掉明显糟糕的分支,大幅减少需要计…

基于Qwen-VL的手机智能体开发

先上Demo: vl_agent_demo 代码如下: 0 设置工作目录: 你的工作目录需要如下: 其中utils文件夹和qwenvl_agent.py均参考自 GitHub - QwenLM/Qwen2.5-VL: Qwen2.5-VL is the multimodal large language model series developed by …

记录一次Spring事务失效导致的生产问题

一、背景介绍 公司做的是“聚合支付”业务,对接了微信、和包、数字人民币等等多家支付机构,我们提供统一的支付、退款、自动扣款签约、解约等能力给全国的省公司、机构、商户等。 同时,需要做对账功能,即支付机构将对账文件给到…

算法 之 贪心思维训练!

文章目录 从最大/最小开始贪心2279.装满石头的背包的最大数量2971.找到最大周长的多边形 从最左、最右开始贪心2712.使所有字符相等的最小成本 划分型贪心1221.分割平衡字符串 贪心策略在处理一些题目的时候能够带来意想不到的效果 从最小/最大开始贪心,优先考虑最小…

大语言模型学习--LangChain

LangChain基本概念 ReAct学习资料 https://zhuanlan.zhihu.com/p/660951271 LangChain官网地址 Introduction | 🦜️🔗 LangChain LangChain是一个基于语言模型开发应用程序的框架。它可以实现以下应用程序: 数据感知:将语言模型…

【PCIe 总线及设备入门学习专栏 4.5 -- PCIe 中断 MSI 与 MSI-X 机制介绍】

文章目录 PCI 设备中断机制PCIe 设备中断机制PCIe MSI 中断机制MSI CapabilityMSI-X 中断机制MSI-X capabilityMSI-X TablePBAMSI-X capability 解析MSI/MSI-X 操作流程扫描设备配置设备MSI 配置MSI-X 配置中断触发与处理PCI 设备中断机制 以前的PCI 设备是支持 物理上的 INTA…

wxWidgets GUI 跨平台 入门学习笔记

准备 参考 https://wiki.wxwidgets.org/Microsoft_Visual_C_NuGethttps://wiki.wxwidgets.org/Tools#Rapid_Application_Development_.2F_GUI_Buildershttps://docs.wxwidgets.org/3.2/https://docs.wxwidgets.org/latest/overview_helloworld.htmlhttps://wizardforcel.gitb…

OpenMCU(一):STM32F407 FreeRTOS移植

概述 本文主要描述了STM32F407移植FreeRTOS的简要步骤。移植描述过程中,忽略了Keil软件的部分使用技巧。默认读者熟练使用Keil软件。本文的描述是基于OpenMCU_FreeRTOS这个工程,该工程已经下载放好了移植stm32f407 FreeRTOS的所有文件 OpenMCU_FreeRTOS工…

[自动驾驶-传感器融合] 多激光雷达的外参标定

文章目录 引言外参标定原理ICP匹配示例参考文献 引言 多激光雷达系统通常用于自动驾驶或机器人,每个雷达的位置和姿态不同,需要将它们的数据统一到同一个坐标系下。多激光雷达外参标定的核心目标是通过计算不同雷达坐标系之间的刚性变换关系&#xff08…

JavaScript 知识点整理

1. 什么是AST?它在前端有哪些应用场景? AST Abstract Syntax Tree抽象语法树,用于表达源码的树形结构 应用: Babel:一个广泛使用的 JS 编译器,将ES6 或 JSX 等现代语法转换为兼容性较好的 ES5 代码。Esl…

鸿蒙与DeepSeek深度整合:构建下一代智能操作系统生态

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 https://www.captainbed.cn/north 目录 技术融合背景与价值鸿蒙分布式架构解析DeepSeek技术体系剖析核心整合架构设计智能调度系统实现…

利用行波展开法测量横观各向同性生物组织的生物力学特性|文献速递-医学影像人工智能进展

Title 题目 Measurement of biomechanical properties of transversely isotropic biological tissue using traveling wave expansion 利用行波展开法测量横观各向同性生物组织的生物力学特性 01 文献速递介绍 纤维嵌入结构在自然界中普遍存在。从脑白质(罗曼…

AR配置静态IP双链路负载分担示例

AR配置静态IP双链路负载分担示例 适用于大部分企业网络出口 业务需求: 运营商1分配的接口IP为100.100.1.2,子网掩码为255.255.255.252,网关IP为100.100.1.1。 运营商2分配的接口IP为200.200.1.2,子网掩码为255.255.255.248&am…

文件操作(详细讲解)(1/2)

你好这里是我说风俗,希望各位客官点点赞,收收藏,关关注,各位对我的支持是我持续更新的动力!!!!第二期会马上更的关注我获得最新消息哦!!!&#xf…

[AI]从零开始的so-vits-svc歌声推理及混音教程

一、前言 在之前的教程中已经为大家讲解了如何安装so-vits-svc以及使用现有的模型进行文本转语音。可能有的小伙伴就要问了,那么我们应该怎么使用so-vits-svc来进行角色歌曲的创作呢?其实歌曲的创作会相对麻烦一些,会使用到好几个软件&#x…

SpringMVC控制器定义:@Controller注解详解

文章目录 引言一、Controller注解基础二、RequestMapping与请求映射三、参数绑定与数据校验四、RestController与RESTful API五、控制器建议与全局处理六、控制器测试策略总结 引言 在SpringMVC框架中,控制器(Controller)是整个Web应用的核心组件,负责处…

免费分享一个软件SKUA-GOCAD-2022版本

若有需要,可以下载。 下载地址 通过网盘分享的文件:Paradigm SKUA-GOCAD 22 build 2022.06.20 (x64).rar 链接: https://pan.baidu.com/s/10plenNcMDftzq3V-ClWpBg 提取码: tm3b 安装教程 Paradigm SKUA-GOCAD 2022版本v2022.06.20安装和破解教程-CS…

学习threejs,使用LineBasicMaterial基础线材质

👨‍⚕️ 主页: gis分享者 👨‍⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️THREE.LineBasicMaterial1.…

java面试题(一)基础部分

1.【String】StringBuffer和StringBuilder区别? String对象是final修饰的不可变的。对String对象的任何操作只会生成新对象,不会对原有对象进行操作。 StringBuilder和StringBuffer是可变的。 其中StringBuilder线程不安全,但开销小。 St…