Websocket如何分块处理数据量超大的消息体

news2025/1/20 5:48:34

在这里插入图片描述

若我们服务端一次性最大处理的字节数是1M,而客户端发来了2M的数据,此时服务端的数据就要被切割成两次传输解码。Http协议中有分块传输,而在Websocket也可以分块处理超大的消息体。在jsr356标准中使用javax.websocket.MessageHandler.Partial可以分块处理这种数据。

 interface Partial<T> extends MessageHandler {

        /**
         * Called when part of a message is available to be processed.
         *
         * @param messagePart   The message part
         * @param last          <code>true</code> if this is the last part of
         *                      this message, else <code>false</code>
         */
        void onMessage(T messagePart, boolean last);
    }

Partial接口中的参数last就是表示是否是最后一块分块数据。

我们即可以给WsContainer全局设置消息体的缓冲池大小,也可以给每个session单独设置消息体的缓冲池大小。发送消息体一旦超过了此大小,数据就会在服务端被分块传输解码。

 //全局设置消息体的缓冲池大小
    @Bean  
    public WebSocketContainerFactoryBean webSocketContainer(){
        WebSocketContainerFactoryBean factoryBean = new WebSocketContainerFactoryBean();
        factoryBean.setMaxTextMessageBufferSize(20);
//        factoryBean.setMaxSessionIdleTimeout(10*1000);
//        factoryBean.setMaxBinaryMessageBufferSize(1000);
        return factoryBean;
    }

	//session级别消息体的缓冲池大小
	 @OnOpen
    public void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {
        session.setMaxTextMessageBufferSize(20);
        //....
  }      

使用JSR356注解实现消息体分块传输


@ServerEndpoint(value = "/ws/{token}")
@Component
public class WebsocketHandler2 {
    private final static Logger log = LoggerFactory.getLogger(WebsocketHandler2.class);
    private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();
    @OnOpen
    public void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {
        session.setMaxTextMessageBufferSize(20);
    }

    @OnMessage
    public void onMessage(String partialMsg, Session session, boolean isLast) throws IOException {
        StringBuilder stringJoiner = dataCache.get(session.getId());
        if (isLast) {
            log.info("receive client(id={}) partial last msg=>{}", session.getId(), partialMsg);
            if (stringJoiner == null) {
                String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), partialMsg);
                session.getBasicRemote().sendText(msg);
            } else {
                dataCache.remove(session.getId());
                stringJoiner.append(partialMsg);
                String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner);
                session.getBasicRemote().sendText(msg);
            }
        } else {
            log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), partialMsg);
            if (stringJoiner == null) {
                stringJoiner = new StringBuilder(partialMsg);
                dataCache.put(session.getId(), stringJoiner);
            } else {
                stringJoiner.append(partialMsg);
            }
        }

    }

使用spring的WebSocketHandler实现消息体分块传输

@Component
public class WebsocketHandler1 extends TextWebSocketHandler {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
   		 session.setTextMessageSizeLimit(20);
    }


    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        StringBuilder stringJoiner = dataCache.get(session.getId());
        if (message.isLast()) {
            log.info("receive client(id={}) partial last msg=>{}", session.getId(), message.getPayload());
            if (stringJoiner == null) {
                TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), message.getPayload()));
                session.sendMessage(msg);
            } else {
                dataCache.remove(session.getId());
                stringJoiner.append(message.getPayload());
                TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner));
                session.sendMessage(msg);
            }
        } else {
            log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), message.getPayload());
            if (stringJoiner == null) {
                stringJoiner = new StringBuilder(message.getPayload());
                dataCache.put(session.getId(), stringJoiner);
            } else {
                stringJoiner.append(message.getPayload());
            }
        }

    }
    @Override
    public boolean supportsPartialMessages() {
        return true;
    }

注意上边的WebsocketHandler1 实现的抽象方法supportsPartialMessages其返回值必须是true,否则处理大消息体时会报错。这是因为StandardWebSocketHandlerAdapter会根据supportsPartialMessages方法返回值将我们的WebSocketHandler适配成MessageHandler.PartialMessageHandler.Whole,而supportsPartialMessages返回值是false就会适配成MessageHandler.WholeMessageHandler.Whole是无法处理分块消息体的。

	//StandardWebSocketHandlerAdapter
	@Override
	public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
		this.wsSession.initializeNativeSession(session);

		// The following inner classes need to remain since lambdas would not retain their
		// declared generic types (which need to be seen by the underlying WebSocket engine)

		if (this.handler.supportsPartialMessages()) {
			session.addMessageHandler(new MessageHandler.Partial<String>() {
				@Override
				public void onMessage(String message, boolean isLast) {
					handleTextMessage(session, message, isLast);
				}
			});
			session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
				@Override
				public void onMessage(ByteBuffer message, boolean isLast) {
					handleBinaryMessage(session, message, isLast);
				}
			});
		}
		else {
			session.addMessageHandler(new MessageHandler.Whole<String>() {
				@Override
				public void onMessage(String message) {
					handleTextMessage(session, message, true);
				}
			});
			session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
				@Override
				public void onMessage(ByteBuffer message) {
					handleBinaryMessage(session, message, true);
				}
			});
		}
		//......
	}

websocket底层是怎么处理分块数据的?我们在方法org.apache.tomcat.websocket.WsFrameBase#processDataText可以看到其具体的处理逻辑。
首先将接收到的二进制数据尝试解码成文本数据,若发现接收缓冲区messageBufferText容量不足则查到分块处理器,若存在分块处理器泽调用sendMessageTex(false),先处理部分数据,若不存在则直接抛出异常。

//org.apache.tomcat.websocket.server.WsFrameBase
	private boolean processDataText() throws IOException {
        // Copy the available data to the buffer
        TransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);
        while (!TransformationResult.END_OF_FRAME.equals(tr)) {
             //...
        }
        messageBufferBinary.flip();
        boolean last = false;
        // Frame is fully received
        // Convert bytes to UTF-8
        while (true) {
            CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText,
                    last);
            if (cr.isError()) {
                throw new WsIOException(new CloseReason(
                        CloseCodes.NOT_CONSISTENT,
                        sm.getString("wsFrame.invalidUtf8")));
            } else if (cr.isOverflow()) {
                // Ran out of space in text buffer - flush it
                //尝试解码时发现接收缓冲区messageBufferText容量不足
                //调用sendMessageTex(false),先处理部分数据。
                if (usePartial()) {   //查找分块处理器
                    messageBufferText.flip();
                    sendMessageText(false);
                    messageBufferText.clear();
                } else { //没有分块处理器,就会抛出异常
                    throw new WsIOException(new CloseReason(
                            CloseCodes.TOO_BIG,
                            sm.getString("wsFrame.textMessageTooBig")));
                }
            } else if (cr.isUnderflow() && !last) {
                // End of frame and possible message as well.

                if (continuationExpected) {
                    // If partial messages are supported, send what we have
                    // managed to decode
                    if (usePartial()) {
                        messageBufferText.flip();
                        sendMessageText(false);
                        messageBufferText.clear();
                    }
                    messageBufferBinary.compact();
                    newFrame();
                    // Process next frame
                    return true;
                } else {
                    // Make sure coder has flushed all output
                    last = true;
                }
            } else {
                // End of message
                messageBufferText.flip();
                //处理最后一块消息
                sendMessageText(true);

                newMessage();
                return true;
            }
        }
	}      
	//确定是否支持分块处理
	private boolean usePartial() {
        if (Util.isControl(opCode)) {
            return false;
        } else if (textMessage) {
            return textMsgHandler instanceof MessageHandler.Partial;
        } else {
            // Must be binary
            return binaryMsgHandler instanceof MessageHandler.Partial;
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2243229.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文复现_How Machine Learning Is Solving the Binary Function Similarity Problem

1. 内容概述 前言&#xff1a;此代码库支持 USENIX Security 22 论文 《How Machine Learning Is Solving the Binary Function Similarity Problem》&#xff0c;作者包括 Andrea Marcelli 等人&#xff0c;提供了相关代码、数据集和技术细节。 关键内容&#xff1a;技术报告…

【视觉SLAM】2-三维空间刚体运动的数学表示

读书笔记&#xff1a;学习空间变换的三种数学表达形式。 文章目录 1. 旋转矩阵1.1 向量运算1.2 坐标系空间变换1.3 变换矩阵与齐次坐标 2. 旋转向量和欧拉角2.1 旋转向量2.2 欧拉角 3. 四元数 1. 旋转矩阵 1.1 向量运算 对于三维空间中的两个向量 a , b ∈ R 3 a,b \in \R^3 …

【WPF】Prism学习(六)

Prism Dependency Injection 1.依赖注入&#xff08;Dependency Injection&#xff09; 1.1. Prism与依赖注入的关系&#xff1a; Prism框架一直围绕依赖注入构建&#xff0c;这有助于构建可维护和可测试的应用程序&#xff0c;并减少或消除对静态和循环引用的依赖。 1.2. P…

多账号登录管理器(淘宝、京东、拼多多等)

目录 下载安装与运行 解决什么问题 功能说明 目前支持的平台 功能演示 登录后能保持多久 下载安装与运行 下载、安装与运行 语雀 解决什么问题 多个账号的快捷登录与切换 功能说明 支持多个电商平台支持多个账号的登录保持支持快捷切换支持导入导出支持批量删除支持…

UniAPP快速入门教程(一)

一、下载HBuilder 首先需要下载HBuilder开发工具&#xff0c;下载地址:https://www.dcloud.io/hbuilderx.htmlhttps://www.dcloud.io/hbuilder.html 选择Windows正式版.zip文件下载。下载解压后直接运行解压目录里的HBuilderX.exe就可以启动HBuilder。 UniApp的插件市场网址…

PyAEDT:Ansys Electronics Desktop API 简介

在本文中&#xff0c;我将向您介绍 PyAEDT&#xff0c;这是一个 Python 库&#xff0c;旨在增强您对 Ansys Electronics Desktop 或 AEDT 的体验。PyAEDT 通过直接与 AEDT API 交互来简化脚本编写&#xff0c;从而允许在 Ansys 的电磁、热和机械求解器套件之间无缝集成。通过利…

SpringBoot源码解析(四):解析应用参数args

SpringBoot源码系列文章 SpringBoot源码解析(一)&#xff1a;SpringApplication构造方法 SpringBoot源码解析(二)&#xff1a;引导上下文DefaultBootstrapContext SpringBoot源码解析(三)&#xff1a;启动开始阶段 SpringBoot源码解析(四)&#xff1a;解析应用参数args 目录…

【Linux】指令 + 重定向操作

Linux基本指令 一.Linux基本指令1.mv&#xff08;重要&#xff09;2.cat3.more和less&#xff08;重要&#xff09;4.head和tail5.date6.cal7.find&#xff08;重要&#xff09; 二.Linux相关知识点1. Linux系统中&#xff1a;一切皆文件2. 重定向操作1. 输出重定向2. 追加重定…

【精通 Readline 库】:优化 Shell 外壳程序的艺术

&#x1f4c3;博客主页&#xff1a; 小镇敲码人 &#x1f49a;代码仓库&#xff0c;欢迎访问 &#x1f680; 欢迎关注&#xff1a;&#x1f44d;点赞 &#x1f442;&#x1f3fd;留言 &#x1f60d;收藏 &#x1f30f; 任尔江湖满血骨&#xff0c;我自踏雪寻梅香。 万千浮云遮碧…

ESP-IDF VScode 项目构建/增加组件 新手友好!!!

项目构建 1.新建文件夹&#xff0c;同时在该文件夹内新建.c和.h文件 如图所示&#xff0c;在components中新建ADC_User.c、ADC_User.h、CMakeLists.txt文件。当然这里你也可以不在components文件夹内新建文件&#xff0c;下面会说没有在components文件夹内新建文件构建项目的方…

玩转N1盒子:速刷OpenWRT软路由系统并实现公网访问管理

文章目录 前言1. 制作刷机固件U盘1.1 制作刷机U盘需要准备以下软件&#xff1a;1.2 制作步骤 2. N1盒子降级与U盘启动2.1 N1盒子降级2.2 N1盒子U盘启动设置2.3 使用U盘刷入OpenWRT2.4 OpenWRT后台IP地址修改2.5 设置旁路由&无线上网 3. 安装cpolar内网穿透3.1 下载公钥3.2 …

机器学习4

九、线性回归 1、概念 假设存在多个点&#xff0c;需要使用一条线来保障尽量拟合这些点&#xff0c;寻找这条线就叫回归。 机器学习中一种有监督学习的算法,回归问题主要关注的是因变量(需要预测的值)和一个或多个数值型的自变量(预测变量)之间的关系。 2、损失函数 存…

【Java EE初阶---多线程(初阶)】初识计算机

乐观学习&#xff0c;乐观生活&#xff0c;才能不断前进啊&#xff01;&#xff01;&#xff01; 我的主页&#xff1a;optimistic_chen 我的专栏&#xff1a;c语言 &#xff0c;Java &#xff0c;Java数据结构 欢迎大家访问~ 创作不易&#xff0c;大佬们点赞鼓励下吧~ 文章目录…

网络安全之国际主流网络安全架构模型

目前&#xff0c;国际主流的网络安全架构模型主要有&#xff1a; ● 信息技术咨询公司Gartner的ASA&#xff08;Adaptive Security Architecture自适应安全架构&#xff09; ● 美国政府资助的非营利研究机构MITRE的ATT&CK&#xff08;Adversarial Tactics Techniques &…

生成式人工智能(AIGC)在软件开发设计模式课程教学中的应用

一、引言 软件设计模式作为软件工程领域的核心组成部分&#xff0c;对于提升软件系统的质量和可维护性至关重要。然而&#xff0c;传统的软件设计模式课程教学方法面临着诸多挑战&#xff0c;例如教师准备教学案例的过程繁琐&#xff0c;学生理解和应用具体案例难度较大&#…

丹摩征文活动|摩智算平台深度解析:Faster R-CNN模型的训练与测试实战

目录 文章前言Faster R-CNN的简介Faster RCNN的训练与测试提前准备1.1 mobaxterm&#xff08;远程连接服务器&#xff09;1.2 本文的源码下载 目标检测模型 Faster-Rcnn2.1云服务器平台 数据上传内置JupyterLab的使用本地连接使用DAMODEL实例获取实例的SSH访问信息通过SSH连接通…

【实用教程】如何利用 JxBrowser 在 Kotlin 中实现屏幕共享

JxBrowser是一个跨平台的 JVM 库&#xff0c;它允许您将基于 Chromium 的 Browser 控件集成到 Compose、Swing、JavaFX、SWT 应用程序中&#xff0c;并使用 Chromium 的数百种功能。为了在 Kotlin 中实现屏幕共享&#xff0c;我们利用了 Chromium 的 WebRTC 支持以及 JxBrowser…

无人机动力系统节能技术的未来发展趋势——CKESC电调小课堂12.1

无人机动力系统节能技术的未来发展趋势包括以下几个方面&#xff1a; 1. 能源类型多元化与高效化 新型电池技术的发展&#xff1a;锂离子电池的性能将不断提升&#xff0c;能量密度增加、充放电速度加快、循环寿命延长。同时&#xff0c;固态电池技术有望取得突破并应用于无人…

【汇编语言】数据处理的两个基本问题(二) —— 解密汇编语言:数据长度与寻址方式的综合应用

文章目录 前言1. 指令要处理的数据有多长&#xff1f;1.1 通过寄存器指明数据的尺寸1.1.1 字操作1.1.2 字节操作 1.2 用操作符X ptr指明内存单元的长度1.2.1 访问字单元1.2.2 访问字节单元1.2.3 为什么要用操作符X ptr指明 1.3 其他方法 2. 寻址方式的综合应用2.1 问题背景&…

【算法】【优选算法】前缀和(下)

目录 一、560.和为K的⼦数组1.1 前缀和1.2 暴力枚举 二、974.和可被K整除的⼦数组2.1 前缀和2.2 暴力枚举 三、525.连续数组3.1 前缀和3.2 暴力枚举 四、1314.矩阵区域和4.1 前缀和4.2 暴力枚举 一、560.和为K的⼦数组 题目链接&#xff1a;560.和为K的⼦数组 题目描述&#x…