Netty之WebSocket协议开发

news2025/1/22 13:04:38

一、WebSocket产生背景

在传统的Web通信中,浏览器是基于请求--响应模式。这种方式的缺点是,浏览器必须始终主动发起请求才能获取更新的数据,而且每次请求都需要经过HTTP的握手和头部信息的传输,造成了较大的网络开销。如果客户端需要及时获得服务端数据,要么通过定时轮训、长轮训或Commet机制,但都不能完美解决。

WebSocket解决了这些问题,它提供了一种持久的连接机制,允许服务器实时地向浏览器推送数据,而无需浏览器重新发起请求。这种双向通信的方式使得Web应用程序能够实时地向用户提供更新的数据,比如在线聊天、实时通知等。

WebSocket底层是基于HTTP协议,并使用了类似握手的过程来建立连接。连接一旦建立,就可以通过发送和接收消息来进行实时通信。WebSocket消息可以是文本、二进制或者其他格式。

二、使用Netty创建WebSocket服务端

2.1 设置解析websocket协议的ChannelHandler

websocket基于http协议,因此netty创建websocket和http的代码非常相似,如下

private class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            ChannelPipeline pipeline = arg0.pipeline();

            // HttpServerCodec: 针对http协议进行编解码
            pipeline.addLast("httpServerCodec", new HttpServerCodec());
            // ChunkedWriteHandler分块写处理,文件过大会将内存撑爆
            pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());
            pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(8192));

            // 用于处理websocket, /ws为访问websocket时的uri
            pipeline.addLast("webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));

            // 这里需要再进行二次编解码
         }
}

上面代码前三个handler跟http一致,新增了一个webSocketServerProtocolHandler处理器,该处理器限制了该服务只提供websocket服务,屏蔽了底层握手、编解码、心跳和断开连接等事件。

至此,服务器可以接收到一个完整的WebSocketFrame包,但业务代码不是基于WebSocketFrame数据,例如jforgame消息包为Message实现类。因此我们需要二次消息解码。

这里的二次编码将进行WebSocketFrame到私有消息的转换,是websocket适配最麻烦的地方,相当于把私有协议栈重新实现一遍。对于TextWebSocketFrame可能还简单一点,客户端只需将包含包头(消息id)和包体(具体消息)的数据进行json化即可。而对于BinaryWebSocketFrame,客户端需要引入第三方消息编解码工具,例如protobuf。(如果客户端使用json,就不用多此一举使用二进制格式了)

2.2WebSocketFrame解码为私有协议消息

 pipeline.addLast("socketFrameToMessage", new MessageToMessageDecoder<WebSocketFrame>() {
                @Override
                protected void decode(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame, List<Object> list) throws Exception {
                    if (frame instanceof TextWebSocketFrame) {
                        String json =  ((TextWebSocketFrame)frame).text();
                        TextFrame textFrame = JsonUtils.string2Object(json, TextFrame.class);

                        Class clazz = DefaultMessageFactory.getInstance().getMessage(NumberUtil.intValue(textFrame.id));
                        Object realMsg = JsonUtils.string2Object(textFrame.msg, clazz);
                        System.out.println(textFrame);
                        list.add(realMsg);
                    } else if (frame instanceof BinaryWebSocketFrame) {
                        throw new UnsupportedOperationException("BinaryWebSocketFrame not supported");
                    }
                }
            });

其中,TextFrame是websocket客户端与服务端通信格式,只有两个字段

    static class TextFrame {
        // 消息id
        String id;
        // 消息内容
        String msg;

    }

注:这里只处理 TextWebSocketFrame文本格式,至于BinaryWebSocketFrame二进制格式,由于使用JavaScript需引入Protobuf等第三方库,这里不做演示。

2.3私有协议消息编码为WebSocketFrame

当服务器向客户端推送消息的时候,需要将私有协议包转为WebSocketFrame。

 pipeline.addLast("messageToSocketFrame", new MessageToMessageEncoder<Object>() {
                @Override
                protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {
                    if (DefaultMessageFactory.getInstance().contains(o.getClass())) {
                        String json = JsonUtils.object2String(o);
                        TextFrame frame = new TextFrame();
                        frame.id = String.valueOf(DefaultMessageFactory.getInstance().getMessageId(o.getClass()));
                        frame.msg = json;
                        list.add(new TextWebSocketFrame(JsonUtils.object2String(frame)));
                    } else if (o instanceof ReferenceCounted) {
                        ((ReferenceCounted)o).retain();
                        list.add(o);
                    } else {
                        list.add(o);
                    }
                }
            });

注:在二次编码的时候,遇到ReferenceCounted子类,需要 retain()一下才可以传递给下一个handler。

2.4将完整业务消息包丢给业务代码执行

pipeline.addLast(new DefaultSocketIoHandler(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)));

至此,把websocket底层的协议差异给屏蔽掉了,无论服务端是采用socket还是websocket,业务代码无需做任何改变。

2.5 服务端完整代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.ReferenceCounted;
import jforgame.commons.NumberUtil;
import jforgame.demo.ServerScanPaths;
import jforgame.demo.socket.MessageIoDispatcher;
import jforgame.demo.utils.JsonUtils;
import jforgame.socket.netty.support.DefaultSocketIoHandler;
import jforgame.socket.share.HostAndPort;
import jforgame.socket.share.ServerNode;
import jforgame.socket.support.DefaultMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;

public class NWebSocketServer implements ServerNode {

    private Logger logger = LoggerFactory.getLogger(NWebSocketServer.class);

    // 避免使用默认线程数参数
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());

    private List<HostAndPort> nodesConfig;

    public NWebSocketServer(HostAndPort hostPort) {
        this.nodesConfig = Arrays.asList(hostPort);
    }

    @Override
    public void start() throws Exception {
        try {
            DefaultMessageFactory.getInstance().initMessagePool(ServerScanPaths.MESSAGE_PATH);
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new WebSocketChannelInitializer());

            for (HostAndPort node : nodesConfig) {
                logger.info("socket server is listening at " + node.getPort() + "......");
                serverBootstrap.bind(new InetSocketAddress(node.getPort())).sync();
            }
        } catch (Exception e) {
            logger.error("", e);

            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();

            throw e;
        }
    }

    @Override
    public void shutdown() throws Exception {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    private class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            ChannelPipeline pipeline = arg0.pipeline();

            // HttpServerCodec: 针对http协议进行编解码
            pipeline.addLast("httpServerCodec", new HttpServerCodec());
            // ChunkedWriteHandler分块写处理,文件过大会将内存撑爆
            pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());
            pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(8192));

            // 用于处理websocket, /ws为访问websocket时的uri
            pipeline.addLast("webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));

            pipeline.addLast("socketFrameToMessage", new MessageToMessageDecoder<WebSocketFrame>() {
                @Override
                protected void decode(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame, List<Object> list) throws Exception {
                    if (frame instanceof TextWebSocketFrame) {
                        String json =  ((TextWebSocketFrame)frame).text();
                        TextFrame textFrame = JsonUtils.string2Object(json, TextFrame.class);

                        Class clazz = DefaultMessageFactory.getInstance().getMessage(NumberUtil.intValue(textFrame.id));
                        Object realMsg = JsonUtils.string2Object(textFrame.msg, clazz);
                        System.out.println(textFrame);
                        list.add(realMsg);
                    } else if (frame instanceof BinaryWebSocketFrame) {
                        throw new UnsupportedOperationException("BinaryWebSocketFrame not supported");
                    }
                }
            });

            pipeline.addLast("messageToSocketFrame", new MessageToMessageEncoder<Object>() {
                @Override
                protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {
                    if (DefaultMessageFactory.getInstance().contains(o.getClass())) {
                        String json = JsonUtils.object2String(o);
                        TextFrame frame = new TextFrame();
                        frame.id = String.valueOf(DefaultMessageFactory.getInstance().getMessageId(o.getClass()));
                        frame.msg = json;
                        list.add(new TextWebSocketFrame(JsonUtils.object2String(frame)));
                    } else if (o instanceof ReferenceCounted) {
                        ((ReferenceCounted)o).retain();
                        list.add(o);
                    } else {
                        list.add(o);
                    }
                }
            });
            pipeline.addLast(new DefaultSocketIoHandler(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)));
        }
    }

    static class TextFrame {
        // 消息id
        String id;
        // 消息内容
        String msg;

    }


    public static void main(String[] args) throws Exception{
        NWebSocketServer socketServer = new NWebSocketServer(HostAndPort.valueOf("localhost", 8080));
        socketServer.start();
    }

}

三、使用js websocket客户端测试代码

3.1js封装weboskcet操作

/**
 * 对webSocket的封装 
 */
(function($) {
 
 
	$.config = {
		url: '', //链接地址
	};
 
 
	$.init=function(config) {
		this.config = config;
		return this;
	};
 
 
	/**
	 * 连接webcocket
	 */
	$.connect = function() {
		var protocol = (window.location.protocol == 'http:') ? 'ws:' : 'ws:';
		this.host = protocol + this.config.url;
 
 
		window.WebSocket = window.WebSocket || window.MozWebSocket;
		if(!window.WebSocket) { // 检测浏览器支持  
			this.error('Error: WebSocket is not supported .');
			return;
		}
		this.socket = new WebSocket(this.host); // 创建连接并注册响应函数  
		this.socket.onopen = function() {
			$.onopen();
		};
		this.socket.onmessage = function(message) {
			$.onmessage(message);
		};
		this.socket.onclose = function() {
			$.onclose();
			$.socket = null; // 清理  
		};
		this.socket.onerror = function(errorMsg) {
			$.onerror(errorMsg);
		}
		return this;
	}
 
 
	/**
	 * 自定义异常函数
	 * @param {Object} errorMsg
	 */
	$.error = function(errorMsg) {
		this.onerror(errorMsg);
	}
 
 
	/**
	 * 消息发送
	 */
	$.send = function(msgId, msg) {
		if(this.socket) {
			var req = {
					"id" : msgId,
					"msg" : JSON.stringify(msg)
				}
			this.socket.send(JSON.stringify(req));
			return true;
		}
		this.error('please connect to the server first !!!');
		return false;
	}

	/**
	 * 消息二进制数据(测试)
	 */
	$.sendBytes = function(msgId, msg) {
		if(this.socket) {
			this.socket.send(new TextEncoder().encode("hello"));
			return true;
		}
		this.error('please connect to the server first !!!');
		return false;
	}
 
 
	$.close = function() {
		if(this.socket != undefined && this.socket != null) {
			this.socket.close();
		} else {
			this.error("this socket is not available");
		}
	}
 
 
	/**
	 * 消息回調
	 * @param {Object} message
	 */
	$.onmessage = function(message) {
 
 
	}
 
 
	/**
	 * 链接回调函数
	 */
	$.onopen = function() {
 
 
	}
 
 
	/**
	 * 关闭回调
	 */
	$.onclose = function() {
 
 
	}
 
 
	/**
	 * 异常回调
	 */
	$.onerror = function() {
 
 
	}
 
 
})(ws = {});

3.2 业务消息注册及发送

/**
 * 与服务端的通信协议绑定
 */

var io_handler = io_handler || {}

io_handler.ReqAccountLogin = "101001";

io_handler.ResAccountLogin = "101051";

var self = io_handler;

var msgHandler = {}

io_handler.bind = function(msgId, handler) {
	msgHandler[msgId] = handler
}

self.bind(self.ResAccountLogin, function(resp) {
	alert("角色登录成功-->" + resp)
})

io_handler.handle = function(msgId, msg) {
	msgHandler[msgId](msg);
}

3.3html测试代码 

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket 客户端</title>

<script src="js/ws.js" type="text/javascript"></script>
<script src="js/io_handler.js" type="text/javascript"></script>
<script type="text/javascript">
	ws.init({
		url : "localhost:8080/ws"
	}).connect();

	//当有消息过来的时候触发
	ws.onmessage = function(event) {
		var resp = JSON.parse(event.data)
		var respMessage = document.getElementById("respMessage");
		respMessage.value = respMessage.value + "\n" + resp.msg;
		
		io_handler.handle(resp.id, resp.msg)
	}

	//连接关闭的时候触发
	ws.onclose = function(event) {
		var respMessage = document.getElementById("respMessage");
		respMessage.value = respMessage.value + "\n断开连接";
	}

	//连接打开的时候触发
	ws.onopen = function(event) {
		var respMessage = document.getElementById("respMessage");
		respMessage.value = "建立连接";
	}

	function sendMsg(msg) { //发送消息 
		if (window.WebSocket) {
			var msg = {
				"accountId" : 123,
				"password":"abc"
			};
			ws.send(io_handler.ReqAccountLogin , msg);
		}
	}
</script>
</head>
<body>
	<form onsubmit="return false">
		<textarea style="width: 300px; height: 200px;" name="message"></textarea>
		<input type="button" onclick="sendMsg(this.form.message.value)"
			value="发送"><br>
		<h3>信息</h3>
		<textarea style="width: 300px; height: 200px;" id="respMessage"></textarea>
		<input type="button" value="清空"
			onclick="javascript:document.getElementById('respMessage').value = ''">
	</form>
</body>
</html>

3.4 客户端运行示例

启动服务器后,点击html测试文件,发送数据

完整代码传送门 --》 jforgame游戏框架 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1494251.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

git 命令怎么回退到某个特定的 commit 并将其推送到远程仓库?

问题 不小心把提交的名称写错提交上远程仓库了&#xff0c;这里应该是 【029】的&#xff0c;这个时候我们想回到【028】这一个提交记录&#xff0c;然后再重新提交【029】到远程仓库&#xff0c;该怎么处理。 解决 1、首先我们找到【028】这条记录的提交 hash&#xff0c;右…

微信小程序开发系列(八)·微信小程序页面的划分以及轮播图区域的绘制和图片的添加

目录 1. 划分页面结构 2. 轮播图区域绘制 3. 轮播图图片添加 1. 划分页面结构 最终我们想达到如下效果&#xff1a; 其页面分为四层结构&#xff0c;因此我们需要配置四块view&#xff0c;代码如下&#xff1a; <!-- view 小程序提供的容器组件&#xff0c;可以当成…

《ChatGPT原理与架构:大模型的预训练、迁移和中间件编程 》

OpenAI 在 2022 年 11 月推出了人工智能聊天应用—ChatGPT。它具有广泛的应用场景&#xff0c;在多项专业和学术基准测试中表现出的智力水平&#xff0c;不仅接近甚至有时超越了人类的平均水平。这使得 ChatGPT 在推出之初就受到广大用户的欢迎&#xff0c;被科技界誉为人工智能…

zabbix监控中间件服务

zabbix监控Nginx 自定义nginx访问量的监控项&#xff0c;首先要通过脚本将各种状态的值取出来&#xff0c;然后通过zabbix监控。找到自定义脚本上传到指定目录/etc/zabbix/script/ 在zbx-client客户端主机操作 #创建目录&#xff0c;然后将脚本上传到该目录mkdir /etc/zabbix/…

7,图像镜像变换

水平镜像就是x图像宽度-原来的x&#xff0c; 垂直镜像就是y图像高度-原来的y void CDib::Mirror_Horizontal() { //指向原图像指针 LPBYTE lpSrc; LPBYTE p_data GetData(); //指向复制区域的指针 LPBYTE lpDst; //图像的宽和高 LONG width GetWidth(); LONG height GetHei…

备战蓝桥杯————二分查找(二)

引言 在上一篇博客中&#xff0c;我们深入探讨了二分搜索算法及其在寻找数组左侧边界的应用。二分搜索作为一种高效的查找方法&#xff0c;其核心思想在于通过不断缩小搜索范围来定位目标值。在本文中&#xff0c;我们将继续这一主题&#xff0c;不仅会回顾二分搜索的基本原理&…

【C++专栏】C++入门 | 命名空间、输入输出、缺省参数

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;C专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家 点赞&#x1f44d;收藏⭐评论✍ C入门 | 命名空间、输入输出、缺省参数 文章编号&#xff1a;C入门 / 0…

Java agent技术的注入利用与避坑点

什么是Java agent技术&#xff1f; Java代理&#xff08;Java agent&#xff09;是一种Java技术&#xff0c;它允许开发人员在运行时以某种方式修改或增强Java应用程序的行为。Java代理通过在Java虚拟机&#xff08;JVM&#xff09;启动时以"代理"&#xff08;agent…

react native中如何使用webView调用腾讯地图选点组件

react native中如何使用webView调用腾讯地图选点组件 效果示例图代码示例备注 效果示例图 代码示例 import React, {useEffect, useRef, useState} from react; import {Modal, StyleSheet} from react-native; import {pxToPd} from ../../common/js/device; import {WebView…

使用PDFBox封装一个简单易用的工具类快速生成pdf文件

文章目录 一、PDFbox说明1、坐标2、线3、图4、字5、字体加载6、jfreechart图表转字节数组7、依赖二、PDFbox样式1、文字颜色2、线颜色3、线样式三、工具类边框样式对齐样式表行列图片列pdf工具类测试方法四、效果图一、PDFbox说明 1、坐标 文档左下角为坐标原点,x轴向右从0增…

Cluade3干货:超越GPT,模型特点分析+使用教程|2024年3月更新

就在刚刚&#xff0c;Claude 发布了最新的大模型 Claude3&#xff0c;并且一次性发布了三个模型&#xff0c;分别是 Claude 3 Haiku&#xff1a;&#xff08;日本俳句 &#xff09;Claude 3 Sonnet&#xff08;英文十四行诗&#xff09;Claude 3 Opus&#xff08;古典乐作品集…

HarmonyOS NEXT应用开发案例——滑动页面信息隐藏与组件位移效果

介绍 在很多应用中&#xff0c;向上滑动"我的"页面&#xff0c;页面顶部会有如下变化效果&#xff1a;一部分信息逐渐隐藏&#xff0c;另一部分信息逐渐显示&#xff0c;同时一些组件会进行缩放或者位置移动。向下滑动时则相反。 效果图预览 使用说明 向上滑动页面…

Vue:双token无感刷新

文章目录 初次授权与发放Token&#xff1a;Access Token的作用&#xff1a;Refresh Token的作用&#xff1a;无感刷新&#xff1a;安全机制&#xff1a;后端创建nest项目AppController 添加login、refresh、getinfo接口创建user.dto.tsAppController添加模拟数据 前端Hbuilder创…

ARM中专用指令(异常向量表、异常源、异常返回等)

状态寄存器传送指令 CPSR寄存器 状态寄存器传送指令:访问&#xff08;读写&#xff09;CPSR寄存器 读CPSR MRS R1, CPSR R1 CPSR 写CPSR MSR CPSR, #0x10 0x10为User模式&#xff0c;且开启IRQ和FRQ CPSR 0x10 在USER模式下不能随意修改CPSR&#xff0c;因为USER模式…

js五星评价的制作方法

方法有两种&#xff0c;1、jquer插件&#xff1b;2、图片循环&#xff1b; 第一种、效果图 代码 <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"…

机器学习---拉格朗日乘子法、Huber Loss、极大似然函数取对数的原因

1. 拉格朗日乘子法 拉格朗日乘子法&#xff08;Lagrange multipliers&#xff09;是一种寻找多元函数在一组约束下的极值的方法。通过引 入拉格朗日乘子&#xff0c;可将有d个变量与k个约束条件的最优化问题转化为具有d&#xff0b;k个变量的无约束优化 问题求解。本文希望通…

java工程师面试笔试题,阿里+头条+抖音+百度+蚂蚁+京东面经

前言 分布式事务主要解决分布式一致性的问题。说到底就是数据的分布式操作导致仅依靠本地事务无法保证原子性。与单机版的事务不同的是&#xff0c;单机是把多个命令打包成一个统一处理&#xff0c;分布式事务是将多个机器上执行的命令打包成一个命令统一处理。 MySQL 提供了…

软件测试计划包括哪些内容?专业第三方软件测试机构推荐

软件测试计划是为确保软件质量而制定的详细计划&#xff0c;它在软件开发周期中扮演着至关重要的角色。一个良好的软件测试计划可以确保软件在交付给最终用户之前经过全面的测试和验证&#xff0c;减少软件出现缺陷和问题的可能性。 软件测试计划一般包括以下内容&#xff1a;…

汇编程序中引用头文件

文章目录 写在前面x86汇编示例(AT&T风格ARM汇编示例运行结果 写在前面 汇编程序中也是可以使用头文件的&#xff0c;因为头文件实际上就是预处理中的一环&#xff0c;使用预处理器也对汇编程序中的头文件进行预处理 本文使用的汇编例程&#xff1a; x86版 AT&T汇编hel…

打印螺旋矩阵

打印螺旋矩阵 题目 如&#xff1a;输入 n 5&#xff1b; 输出&#xff1a; 1 2 3 4 5 16 17 18 19 6 15 24 25 20 7 14 23 22 21 8 13 12 11 10 9解题 这种规律打印题我个人感觉是真的不好写&#xff0c;一看答案感觉也就那回事&#xff0c;真自己琢磨&#xff0c;半…