本文将使用Netty快速实现一个聊天室应用,该应用基于WebSocket协议,用户可以在浏览器内聊天。
实现过程很简单,就几步。
一、处理Http请求
package cn.md.netty.websocket.groupchat;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URISyntaxException;
import java.net.URL;
/**
* * @Author: Martin
* * @Date 2024/9/2 11:21
* * @Description
**/
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
// websocket的请求地址
private final String wsUri;
// 页面文件的地址
private static final File PAGE_FILE;
static {
URL location = HttpRequestHandler.class.getProtectionDomain()
.getCodeSource().getLocation();
String path = null;
try {
path = location.toURI() + "wsChatClient.html";
path = path.contains("file:") ? path.substring(5):path;
PAGE_FILE = new File("/Users/jack/Documents/TMP/wsChatClient.html");
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
/**
* see {@link #SimpleChannelInboundHandler(boolean)} with {@code true} as boolean parameter.
*/
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
/**
* Is called for each message of type {@link I}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
* belongs to
* @param msg the message to handle
* @throws Exception is thrown if an error occurred
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.uri())) {
// 交给下一个InboudHandler 处理
ctx.fireChannelRead(request.retain());
return;
}
if (HttpUtil.is100ContinueExpected(request)) {
// 发送 100 continue
send100Continue(ctx);
}
// 读取文件内容到字节数组
byte[] fileContent;
try (RandomAccessFile file = new RandomAccessFile(PAGE_FILE, "r")) {
fileContent = new byte[(int) file.length()];
file.readFully(fileContent);
} catch (IOException e) {
// 处理文件读取错误
DefaultFullHttpResponse errorResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
ctx.writeAndFlush(errorResponse);
return;
}
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpUtil.isKeepAlive(request);
if (keepAlive) {
resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileContent.length);
resp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// 将文件内容设置为响应内容
resp.content().writeBytes(fileContent);
ChannelFuture channelFuture = ctx.writeAndFlush(resp);
if (!keepAlive) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("response empty last content success !");
}
});
}
private void send100Continue(ChannelHandlerContext ctx) {
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
System.out.println("client " + channel.remoteAddress() + "异常");
cause.printStackTrace();
ctx.close();
}
}
二、处理WebSocket帧
package cn.md.netty.websocket.groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* 处理Websocket帧
* * @Author: Martin
* * @Date 2024/9/2 13:21
* * @Description
**/
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.add(channel);
// 广播
channels.writeAndFlush(new TextWebSocketFrame("[server] - " + channel.remoteAddress() + "加入"));
System.out.println(channel.remoteAddress() + "加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.writeAndFlush(new TextWebSocketFrame("[server] - " + channel.remoteAddress() + "离开"));
System.out.println(channel.remoteAddress() + "离开");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "在线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "掉线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 异常");
cause.printStackTrace();
ctx.close();
}
/**
* Is called for each message of type {@link I}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
* belongs to
* @param msg the message to handle
* @throws Exception is thrown if an error occurred
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("收到消息:" + msg.text());
Channel channel = ctx.channel();
for (Channel ch : channels){
if (channel != ch){
ch.writeAndFlush(new TextWebSocketFrame(channel.remoteAddress() + ":" + msg.text()));
}else {
ch.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text()));
}
}
}
}
三、实现ChannelInitializer
package cn.md.netty.websocket.groupchat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* * @Author: Martin
* * @Date 2024/9/2 13:37
* * @Description
**/
public class WebSocketChatServerInitializer extends ChannelInitializer<SocketChannel> {
/**
* This method will be called once the {@link Channel} was registered. After the method returns this instance
* will be removed from the {@link ChannelPipeline} of the {@link Channel}.
*
* @param ch the {@link Channel} which was registered.
* @throws Exception is thrown if an error occurs. In that case it will be handled by
* {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close
* the {@link Channel}.
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(64 * 1024))
.addLast(new ChunkedWriteHandler())
.addLast(new HttpRequestHandler("/ws"))
.addLast(new WebSocketServerProtocolHandler("/ws"))
.addLast(new TextWebSocketFrameHandler());
}
}
四、服务器启动程序
package cn.md.netty.websocket.groupchat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* * @Author: Martin
* * @Date 2024/9/2 13:40
* * @Description
**/
public class WebSocketChatServer {
private final int port;
public WebSocketChatServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
new WebSocketChatServer(8088).run();
}
public void run() throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketChatServerInitializer())
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
System.out.println("bind port success");
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("server stop");
}
}
}
五、编写客户端
编写html代码,我这里的页面地址是 /Users/jack/Documents/TMP/wsChatClient.html,你的需要修改HttpRequestHandler.java中的文件路径。代码如下:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8088/ws");
socket.onmessage = function(event) {
console.log("receive message:" + event.data)
var ele = document.getElementById('respText');
ele.value = ele.value + "\n" + event.data;
};
socket.onopen = function(event) {
var ele = document.getElementById('respText');
ele.value = "连接开启";
}
socket.onclose = function(event) {
var ele = document.getElementById('respText');
ele.value = ele.value + "连接被关闭";
}
} else {
alert("Your browser does not support WebSocket!");
}
function sendMessage(message) {
if (!window.WebSocket) {
alert("Your browser does not support WebSocket!")
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
console.log("send message:" + message)
} else {
alert("The connection is not open.");
}
}
</script>
<form onsubmit="return false">
<h3>WebSocket Chat</h3>
<textarea name="" id= "respText" style="width: 500px;height: 300px"></textarea>
<input type="text" name="message" style="width: 300px" value="Welcome to chat.marding.cn">
<input type="button" value="发送" onclick="sendMessage(this.form.message.value)">
</input>
</form>
</body>
</html>
六、测试
可以打开多个页面进行测试~~~
我是马丁,喜欢可以点赞+关注哦~ See you ~