一、WebSocket产生背景
在传统的Web通信中,浏览器是基于请求--响应模式。这种方式的缺点是,浏览器必须始终主动发起请求才能获取更新的数据,而且每次请求都需要经过HTTP的握手和头部信息的传输,造成了较大的网络开销。如果客户端需要及时获得服务端数据,要么通过定时轮训、长轮训或Commet机制,但都不能完美解决。
WebSocket解决了这些问题,它提供了一种持久的连接机制,允许服务器实时地向浏览器推送数据,而无需浏览器重新发起请求。这种双向通信的方式使得Web应用程序能够实时地向用户提供更新的数据,比如在线聊天、实时通知等。
WebSocket底层是基于HTTP协议,并使用了类似握手的过程来建立连接。连接一旦建立,就可以通过发送和接收消息来进行实时通信。WebSocket消息可以是文本、二进制或者其他格式。
二、使用Netty创建WebSocket服务端
2.1 设置解析websocket协议的ChannelHandler
websocket基于http协议,因此netty创建websocket和http的代码非常相似,如下
private class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
// HttpServerCodec: 针对http协议进行编解码
pipeline.addLast("httpServerCodec", new HttpServerCodec());
// ChunkedWriteHandler分块写处理,文件过大会将内存撑爆
pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());
pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(8192));
// 用于处理websocket, /ws为访问websocket时的uri
pipeline.addLast("webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
// 这里需要再进行二次编解码
}
}
上面代码前三个handler跟http一致,新增了一个webSocketServerProtocolHandler处理器,该处理器限制了该服务只提供websocket服务,屏蔽了底层握手、编解码、心跳和断开连接等事件。
至此,服务器可以接收到一个完整的WebSocketFrame包,但业务代码不是基于WebSocketFrame数据,例如jforgame消息包为Message实现类。因此我们需要二次消息解码。
这里的二次编码将进行WebSocketFrame到私有消息的转换,是websocket适配最麻烦的地方,相当于把私有协议栈重新实现一遍。对于TextWebSocketFrame可能还简单一点,客户端只需将包含包头(消息id)和包体(具体消息)的数据进行json化即可。而对于BinaryWebSocketFrame,客户端需要引入第三方消息编解码工具,例如protobuf。(如果客户端使用json,就不用多此一举使用二进制格式了)
2.2WebSocketFrame解码为私有协议消息
pipeline.addLast("socketFrameToMessage", new MessageToMessageDecoder<WebSocketFrame>() {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame, List<Object> list) throws Exception {
if (frame instanceof TextWebSocketFrame) {
String json = ((TextWebSocketFrame)frame).text();
TextFrame textFrame = JsonUtils.string2Object(json, TextFrame.class);
Class clazz = DefaultMessageFactory.getInstance().getMessage(NumberUtil.intValue(textFrame.id));
Object realMsg = JsonUtils.string2Object(textFrame.msg, clazz);
System.out.println(textFrame);
list.add(realMsg);
} else if (frame instanceof BinaryWebSocketFrame) {
throw new UnsupportedOperationException("BinaryWebSocketFrame not supported");
}
}
});
其中,TextFrame是websocket客户端与服务端通信格式,只有两个字段
static class TextFrame {
// 消息id
String id;
// 消息内容
String msg;
}
注:这里只处理 TextWebSocketFrame文本格式,至于BinaryWebSocketFrame二进制格式,由于使用JavaScript需引入Protobuf等第三方库,这里不做演示。
2.3私有协议消息编码为WebSocketFrame
当服务器向客户端推送消息的时候,需要将私有协议包转为WebSocketFrame。
pipeline.addLast("messageToSocketFrame", new MessageToMessageEncoder<Object>() {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {
if (DefaultMessageFactory.getInstance().contains(o.getClass())) {
String json = JsonUtils.object2String(o);
TextFrame frame = new TextFrame();
frame.id = String.valueOf(DefaultMessageFactory.getInstance().getMessageId(o.getClass()));
frame.msg = json;
list.add(new TextWebSocketFrame(JsonUtils.object2String(frame)));
} else if (o instanceof ReferenceCounted) {
((ReferenceCounted)o).retain();
list.add(o);
} else {
list.add(o);
}
}
});
注:在二次编码的时候,遇到ReferenceCounted子类,需要 retain()一下才可以传递给下一个handler。
2.4将完整业务消息包丢给业务代码执行
pipeline.addLast(new DefaultSocketIoHandler(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)));
至此,把websocket底层的协议差异给屏蔽掉了,无论服务端是采用socket还是websocket,业务代码无需做任何改变。
2.5 服务端完整代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.ReferenceCounted;
import jforgame.commons.NumberUtil;
import jforgame.demo.ServerScanPaths;
import jforgame.demo.socket.MessageIoDispatcher;
import jforgame.demo.utils.JsonUtils;
import jforgame.socket.netty.support.DefaultSocketIoHandler;
import jforgame.socket.share.HostAndPort;
import jforgame.socket.share.ServerNode;
import jforgame.socket.support.DefaultMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
public class NWebSocketServer implements ServerNode {
private Logger logger = LoggerFactory.getLogger(NWebSocketServer.class);
// 避免使用默认线程数参数
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
private List<HostAndPort> nodesConfig;
public NWebSocketServer(HostAndPort hostPort) {
this.nodesConfig = Arrays.asList(hostPort);
}
@Override
public void start() throws Exception {
try {
DefaultMessageFactory.getInstance().initMessagePool(ServerScanPaths.MESSAGE_PATH);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new WebSocketChannelInitializer());
for (HostAndPort node : nodesConfig) {
logger.info("socket server is listening at " + node.getPort() + "......");
serverBootstrap.bind(new InetSocketAddress(node.getPort())).sync();
}
} catch (Exception e) {
logger.error("", e);
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
throw e;
}
}
@Override
public void shutdown() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
private class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
// HttpServerCodec: 针对http协议进行编解码
pipeline.addLast("httpServerCodec", new HttpServerCodec());
// ChunkedWriteHandler分块写处理,文件过大会将内存撑爆
pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());
pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(8192));
// 用于处理websocket, /ws为访问websocket时的uri
pipeline.addLast("webSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast("socketFrameToMessage", new MessageToMessageDecoder<WebSocketFrame>() {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame, List<Object> list) throws Exception {
if (frame instanceof TextWebSocketFrame) {
String json = ((TextWebSocketFrame)frame).text();
TextFrame textFrame = JsonUtils.string2Object(json, TextFrame.class);
Class clazz = DefaultMessageFactory.getInstance().getMessage(NumberUtil.intValue(textFrame.id));
Object realMsg = JsonUtils.string2Object(textFrame.msg, clazz);
System.out.println(textFrame);
list.add(realMsg);
} else if (frame instanceof BinaryWebSocketFrame) {
throw new UnsupportedOperationException("BinaryWebSocketFrame not supported");
}
}
});
pipeline.addLast("messageToSocketFrame", new MessageToMessageEncoder<Object>() {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {
if (DefaultMessageFactory.getInstance().contains(o.getClass())) {
String json = JsonUtils.object2String(o);
TextFrame frame = new TextFrame();
frame.id = String.valueOf(DefaultMessageFactory.getInstance().getMessageId(o.getClass()));
frame.msg = json;
list.add(new TextWebSocketFrame(JsonUtils.object2String(frame)));
} else if (o instanceof ReferenceCounted) {
((ReferenceCounted)o).retain();
list.add(o);
} else {
list.add(o);
}
}
});
pipeline.addLast(new DefaultSocketIoHandler(new MessageIoDispatcher(ServerScanPaths.MESSAGE_PATH)));
}
}
static class TextFrame {
// 消息id
String id;
// 消息内容
String msg;
}
public static void main(String[] args) throws Exception{
NWebSocketServer socketServer = new NWebSocketServer(HostAndPort.valueOf("localhost", 8080));
socketServer.start();
}
}
三、使用js websocket客户端测试代码
3.1js封装weboskcet操作
/**
* 对webSocket的封装
*/
(function($) {
$.config = {
url: '', //链接地址
};
$.init=function(config) {
this.config = config;
return this;
};
/**
* 连接webcocket
*/
$.connect = function() {
var protocol = (window.location.protocol == 'http:') ? 'ws:' : 'ws:';
this.host = protocol + this.config.url;
window.WebSocket = window.WebSocket || window.MozWebSocket;
if(!window.WebSocket) { // 检测浏览器支持
this.error('Error: WebSocket is not supported .');
return;
}
this.socket = new WebSocket(this.host); // 创建连接并注册响应函数
this.socket.onopen = function() {
$.onopen();
};
this.socket.onmessage = function(message) {
$.onmessage(message);
};
this.socket.onclose = function() {
$.onclose();
$.socket = null; // 清理
};
this.socket.onerror = function(errorMsg) {
$.onerror(errorMsg);
}
return this;
}
/**
* 自定义异常函数
* @param {Object} errorMsg
*/
$.error = function(errorMsg) {
this.onerror(errorMsg);
}
/**
* 消息发送
*/
$.send = function(msgId, msg) {
if(this.socket) {
var req = {
"id" : msgId,
"msg" : JSON.stringify(msg)
}
this.socket.send(JSON.stringify(req));
return true;
}
this.error('please connect to the server first !!!');
return false;
}
/**
* 消息二进制数据(测试)
*/
$.sendBytes = function(msgId, msg) {
if(this.socket) {
this.socket.send(new TextEncoder().encode("hello"));
return true;
}
this.error('please connect to the server first !!!');
return false;
}
$.close = function() {
if(this.socket != undefined && this.socket != null) {
this.socket.close();
} else {
this.error("this socket is not available");
}
}
/**
* 消息回調
* @param {Object} message
*/
$.onmessage = function(message) {
}
/**
* 链接回调函数
*/
$.onopen = function() {
}
/**
* 关闭回调
*/
$.onclose = function() {
}
/**
* 异常回调
*/
$.onerror = function() {
}
})(ws = {});
3.2 业务消息注册及发送
/**
* 与服务端的通信协议绑定
*/
var io_handler = io_handler || {}
io_handler.ReqAccountLogin = "101001";
io_handler.ResAccountLogin = "101051";
var self = io_handler;
var msgHandler = {}
io_handler.bind = function(msgId, handler) {
msgHandler[msgId] = handler
}
self.bind(self.ResAccountLogin, function(resp) {
alert("角色登录成功-->" + resp)
})
io_handler.handle = function(msgId, msg) {
msgHandler[msgId](msg);
}
3.3html测试代码
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket 客户端</title>
<script src="js/ws.js" type="text/javascript"></script>
<script src="js/io_handler.js" type="text/javascript"></script>
<script type="text/javascript">
ws.init({
url : "localhost:8080/ws"
}).connect();
//当有消息过来的时候触发
ws.onmessage = function(event) {
var resp = JSON.parse(event.data)
var respMessage = document.getElementById("respMessage");
respMessage.value = respMessage.value + "\n" + resp.msg;
io_handler.handle(resp.id, resp.msg)
}
//连接关闭的时候触发
ws.onclose = function(event) {
var respMessage = document.getElementById("respMessage");
respMessage.value = respMessage.value + "\n断开连接";
}
//连接打开的时候触发
ws.onopen = function(event) {
var respMessage = document.getElementById("respMessage");
respMessage.value = "建立连接";
}
function sendMsg(msg) { //发送消息
if (window.WebSocket) {
var msg = {
"accountId" : 123,
"password":"abc"
};
ws.send(io_handler.ReqAccountLogin , msg);
}
}
</script>
</head>
<body>
<form onsubmit="return false">
<textarea style="width: 300px; height: 200px;" name="message"></textarea>
<input type="button" onclick="sendMsg(this.form.message.value)"
value="发送"><br>
<h3>信息</h3>
<textarea style="width: 300px; height: 200px;" id="respMessage"></textarea>
<input type="button" value="清空"
onclick="javascript:document.getElementById('respMessage').value = ''">
</form>
</body>
</html>
3.4 客户端运行示例
启动服务器后,点击html测试文件,发送数据
完整代码传送门 --》 jforgame游戏框架