目录
四、应用
1、粘包与半包
现象分析
粘包
半包
本质
解决方案
短链接
定长解码器
行解码器
长度字段解码器——LTC
2、协议设计与解析
协议的作用
Redis协议
HTTP协议
自定义协议
组成要素
编码器与解码器
编写测试类
@Sharable注解
自定义编解码器能否使用@Sharable注解
3、在线聊天室
聊天室业务
用户登录接口
用户会话接口
群聊会话接口
整体结构
客户端代码结构
服务器代码结构
登录
客户端代码
服务器代码
运行结果
单聊
群聊
创建群聊
群聊聊天
加入群聊
退出
查看群聊成员
退出聊天室
连接假死
解决方法
四、应用
1、粘包与半包
粘包和半包问题是数据传输中比较常见的问题,所谓的粘包问题是指数据在传输时,在一条消息中读取到了另一条消息的部分数据,这种现象就叫做粘包。比如发送了两条消息,分别为“ABC”和“DEF”,那么正常情况下接收端也应该收到两条消息“ABC”和“DEF”,但接收端却收到的是“ABCD”,像这种情况就叫做粘包,半包问题是指接收端只收到了部分数据,而非完整的数据的情况就叫做半包。比如发送了一条消息是“ABC”,而接收端却收到的是“AB”和“C”两条信息,这种情况就叫做半包
只要是TCP协议的网络交互都有粘包和半包问题,因为TCP的传输是基于字节的传输方式,数据是以字节的形式进行传输的,并没有明确的边界。因此,在传输过程中,TCP没有办法直接识别数据包的边界,并且在流量控制下,TCP的字节传输还不稳定,当发送方连续发送多个数据包时,这些数据包可能会在网络传输的过程中合并或拆分,导致粘包和半包问题的出现,而UDP则没有这个问题,因为UDP的传输是基于数据报的
现象分析
粘包
现象
- 发送 abc def,接收 abcdef
原因
- 应用层
- 接收方 ByteBuf 设置太大(Netty 默认 1024)
- 传输层-网络层
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大(大于256 bytes),这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
- Nagle 算法:会造成粘包
Nagle算法的原理如下:当TCP发送方需要发送一个小数据包时,Nagle算法会将这个数据包缓存起来,不立即发送。然后,TCP发送方会继续等待其他数据,直到以下两个条件中的任意一个满足后再发送数据:
- 接收到之前发送的数据的确认ACK。
- 发送方的发送缓冲区中的数据量达到一定的阈值(一般是MSS,即最大报文长度)。
之所以要缓存起来是因为一个TCP的请求都是要进行数据报头的添加,而IP的报头+TCP的报头 = 40字节,哪怕你只是发送了1个字节的数据,也会被封装为一个41字节的传输内容,那这样子粘包现象就很严重了,为此解决方法就是,缓存多点字节再一起发过来。然而,当发送方连续发送多个小数据包时,这些数据包可能会在网络传输的过程中被合并成一个大数据包,导致粘包问题的出现。这是因为Nagle算法本身不考虑数据包的边界,只是简单地将小数据包缓存起来,直到条件满足后发送。
半包
现象
- 发送 abcdef,接收 abc def
原因
- 应用层
- 接收方 ByteBuf 小于实际发送数据量
- 传输层-网络层
- 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
- 数据链路层
- MSS 限制:当发送的数据超过 MSS (最大报文长度)限制后,会将数据切分发送,就会造成半包
本质
发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界
解决方案
解决方案的思路和我这篇文章的处理方式类似,可以先看一下这个大概思路https://blog.csdn.net/weixin_73077810/article/details/131843387
短链接和长连接是描述客户端与服务器之间TCP连接持续时间的概念。
- 短链接:短链接通常指的是一次性的临时连接。在短链接中,客户端与服务器建立连接、交换数据后,连接就会关闭。在每次通信之前,需要重新建立连接,进行握手和协商。
短链接的优点是简单、轻量,适用于临时的、低频率的通信。但在高并发或频繁通信的场景中,频繁的连接建立和关闭会增加网络开销和延迟。
- 长连接:长连接指的是客户端与服务器之间持久的TCP连接。在长连接中,连接一经建立,客户端和服务器可以多次、长时间地进行双向通信。在连接建立后,数据可以实时、便捷地传输。
长连接的优点是减少连接建立和断开的开销,节省网络资源,减少延迟,提高通信效率。长连接常用于需要实时交互的应用,如即时通信、实时数据传输等。
需要注意的是,长连接可能会带来一些管理上的挑战。服务器需要维护大量的长连接,消耗资源,需要适当管理连接数和超时机制,防止资源浪费和死连接问题。
短链接
客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象
客户端代码改进
修改channelActive方法
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 使用短链接,每次发送完毕后就断开连接
ctx.channel().close();
}
定长解码器
客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder
对数据进行定长解码,具体使用方法如下
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
客户端代码
客户端发送数据的代码如下
// 约定最大长度为16
final int maxLength = 16;
// 被发送的数据
char c = 'a';
// 向服务器发送10个报文
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer(maxLength);
// 定长byte数组,未使用部分会以0进行填充
byte[] bytes = new byte[maxLength];
// 生成长度为0~15的数据
for (int j = 0; j < (int)(Math.random()*(maxLength-1)); j++) {
bytes[j] = (byte) c;
}
buffer.writeBytes(bytes);
c++;
// 将数据发送给服务器
ctx.writeAndFlush(buffer);
}
服务器代码
使用FixedLengthFrameDecoder
对粘包数据进行拆分,该handler需要添加在LoggingHandler
之前,保证数据被打印时已被拆分
// 通过定长解码器对粘包数据进行拆分
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
行解码器
行解码器的是通过分隔符对数据进行拆分来解决粘包半包问题的
- 可以通过
LineBasedFrameDecoder(int maxLength)
来拆分以换行符(\n or \r\n)为分隔符的数据 - 可以通过
DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)
来指定通过什么分隔符来拆分数据(可以传入多个分隔符)
两种解码器都需要传入数据的最大长度,若超出最大长度,会抛出TooLongFrameException
异常
以换行符 \n 为分隔符
客户端代码
// 约定最大长度为 64
final int maxLength = 64;
// 被发送的数据
char c = 'a';
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer(maxLength);
// 生成长度为0~62的数据
Random random = new Random();
StringBuilder sb = new StringBuilder();
for (int j = 0; j < (int)(random.nextInt(maxLength-2)); j++) {
sb.append(c);
}
// 数据以 \n 结尾
sb.append("\n");
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
c++;
// 将数据发送给服务器
ctx.writeAndFlush(buffer);
}
服务器代码
// 通过行解码器对粘包数据进行拆分,以 \n 为分隔符
// 需要指定最大长度
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
以自定义分隔符 \c 为分隔符
客户端代码
// 数据以 \c 结尾
sb.append("\\c");
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
服务器代码
// 将分隔符放入ByteBuf中
ByteBuf bufSet = ch.alloc().buffer().writeBytes("\\c".getBytes(StandardCharsets.UTF_8));
// 通过行解码器对粘包数据进行拆分,以 \c 为分隔符
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64, ch.alloc().buffer().writeBytes(bufSet)));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
长度字段解码器——LTC
在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的
LengthFieldBasedFrameDecoder
解码器可以提供更为丰富的拆分方法,其构造方法有五个参数
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)
参数解析
-
maxFrameLength 数据最大长度
- 表示数据的最大长度(包括附加信息、长度标识等内容)
-
lengthFieldOffset 数据长度标识的起始偏移量
- 用于指明数据第几个字节开始是用于标识有用字节长度的,因为前面可能还有其他附加信息
-
lengthFieldLength 数据长度标识所占字节数(用于指明有用数据的长度)
- 数据中用于表示有用数据长度的标识所占的字节数
-
lengthAdjustment 长度表示为有用数据的偏移量
- 用于指明数据长度标识和有用数据之间的距离,因为两者之间还可能有附加信息
-
initialBytesToStrip 数据读取起点
- 读取起点,不读取 0 ~ initialBytesToStrip 之间的数据
参数图解
使用
通过 EmbeddedChannel 对 handler 进行测试
public class EncoderStudy {
public static void main(String[] args) {
// 模拟服务器
// 使用EmbeddedChannel测试handler
EmbeddedChannel channel = new EmbeddedChannel(
// 数据最大长度为1KB,长度标识前后各有1个字节的附加信息,长度标识长度为4个字节(int)
new LengthFieldBasedFrameDecoder(1024, 1, 4, 1, 0),
new LoggingHandler(LogLevel.DEBUG)
);
// 模拟客户端,写入数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer, "Hello");
channel.writeInbound(buffer);
send(buffer, "World");
channel.writeInbound(buffer);
}
private static void send(ByteBuf buf, String msg) {
// 得到数据的长度
int length = msg.length();
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
// 将数据信息写入buf
// 写入长度标识前的其他信息
buf.writeByte(0xCA);
// 写入数据长度标识
buf.writeInt(length);
// 写入长度标识后的其他信息
buf.writeByte(0xFE);
// 写入具体的数据
buf.writeBytes(bytes);
}
}
运行结果
146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ca 00 00 00 05 fe 48 65 6c 6c 6f |......Hello |
+--------+-------------------------------------------------+----------------+
146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| ca 00 00 00 05 fe 57 6f 72 6c 64 |......World |
+--------+-------------------------------------------------+----------------+
2、协议设计与解析
协议的作用
TCP/IP 中消息传输基于字节流的方式,没有边界
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
Redis协议
如果我们要向Redis服务器发送一条set name Nyima
的指令,需要遵守如下协议
// 该指令一共有3部分,每条指令之后都要添加回车与换行符
*3\r\n
// 第一个指令的长度是3
$3\r\n
// 第一个指令是set指令
set\r\n
// 下面的指令以此类推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n
客户端代码如下
public class RedisClient {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 打印日志
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 回车与换行符
final byte[] LINE = {'\r','\n'};
// 获得ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 连接建立后,向Redis中发送一条指令,注意添加回车与换行
// set name Nyima
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$5".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("Nyima".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
});
}
})
.connect(new InetSocketAddress("localhost", 6379));
channelFuture.sync();
// 关闭channel
channelFuture.channel().close().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭group
group.shutdownGracefully();
}
}
}
Redis中查询执行结果
HTTP协议
HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec
作为服务器端的解码器与编码器,来处理HTTP请求
// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec
服务器代码
public class HttpServer {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 作为服务器,使用 HttpServerCodec 作为编码器与解码器
ch.pipeline().addLast(new HttpServerCodec());
// 服务器只处理HTTPRequest,具体的限定取决于泛型
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
// 获得请求uri
log.debug(msg.uri());
// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
}
})
.bind(8080);
}
}
服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可
// 服务器只处理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse
,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为获得CONTENT_LENGTH
而一直空转,需要添加CONTENT_LENGTH
字段,表明响应体中数据的具体长度
// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
运行结果
浏览器
自定义协议
组成要素
- 魔数:用来在第一时间判定接收的数据是否为无效数据包
- 版本号:可以支持协议的升级
- 序列化算法:消息正文到底采用哪种序列化反序列化方式
- 如:json、protobuf、hessian、jdk
- 指令类型:是登录、注册、单聊、群聊… 跟业务相关
- 请求序号:为了双工通信,提供异步能力
- 正文长度
- 消息正文
编码器与解码器
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 设置魔数 4个字节
out.writeBytes(new byte[]{'N','Y','I','M'});
// 设置版本号 1个字节
out.writeByte(1);
// 设置序列化方式 1个字节
out.writeByte(1);
// 设置指令类型 1个字节
out.writeByte(msg.getMessageType());
// 设置请求序号 4个字节
out.writeInt(msg.getSequenceId());
// 为了补齐为2的次幂个字节,填充1个字节的数据,满足为16字节
out.writeByte(0xff);
// 获得序列化后的msg
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 获得并设置正文长度 长度用4个字节标识
out.writeInt(bytes.length);
// 设置消息正文
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 获取魔数
int magic = in.readInt();
// 获取版本号
byte version = in.readByte();
// 获得序列化方式
byte seqType = in.readByte();
// 获得指令类型
byte messageType = in.readByte();
// 获得请求序号
int sequenceId = in.readInt();
// 移除补齐字节
in.readByte();
// 获得正文长度
int length = in.readInt();
// 获得正文
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
// 将信息放入List中,传递给下一个handler
out.add(message);
// 打印获得的信息正文
System.out.println("===========魔数===========");
System.out.println(magic);
System.out.println("===========版本号===========");
System.out.println(version);
System.out.println("===========序列化方法===========");
System.out.println(seqType);
System.out.println("===========指令类型===========");
System.out.println(messageType);
System.out.println("===========请求序号===========");
System.out.println(sequenceId);
System.out.println("===========正文长度===========");
System.out.println(length);
System.out.println("===========正文===========");
System.out.println(message);
}
}
-
编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。此处使用了自定义类Message,代表消息
public class MessageCodec extends ByteToMessageCodec<Message>
- 编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。正文内容如果为对象,需要通过序列化将其放入到ByteBuf中
-
解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler
编写测试类
public class TestCodec {
static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel();
// 添加解码器,避免粘包半包问题
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
// 开启控制台日志
channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 绑定自定义编码器与解码器,其内部重写父类的encode和decode两个handler方法
channel.pipeline().addLast(new MessageCodec());
// 自定义的封装dto类
LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");
// 测试编码与解码
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
// 内部将user正文数据存储到byteBuf的正文位置上
new MessageCodec().encode(null, user, byteBuf);
channel.writeInbound(byteBuf);
}
}
- 测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题
- 通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码
运行结果
@Sharable注解
为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一个handler对象,提高复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
但是并不是所有的handler都能通过这种方法来提高复用率的,例如LengthFieldBasedFrameDecoder
。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题
- channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播
- 此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误
为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable
注解来标明,该handler能否在多个channel中共享。
只有带有该注解,才能通过对象的方式被共享,否则无法被共享
自定义编解码器能否使用@Sharable注解
这需要根据自定义的handler的处理逻辑进行分析
我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的
但是实际情况我们并不能添加该注解,会抛出异常信息ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared
-
因为MessageCodec继承自ByteToMessageCodec,ByteToMessageCodec类的注解如下
-
这就意味着ByteToMessageCodec不能被多个channel所共享的
- 原因:因为该类的目标是:将ByteBuf转化为Message,意味着传进该handler的数据还未被处理过。所以传过来的ByteBuf可能并不是完整的数据,如果共享则会出现问题
如果想要共享,需要怎么办呢?
继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。实现方式与ByteToMessageCodec类似
@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
...
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
...
}
}
3、在线聊天室
聊天室业务
用户登录接口
public interface UserService {
/**
* 登录
* @param username 用户名
* @param password 密码
* @return 登录成功返回 true, 否则返回 false
*/
boolean login(String username, String password);
}
用户会话接口
public interface Session {
/**
* 绑定会话
* @param channel 哪个 channel 要绑定会话
* @param username 会话绑定用户
*/
void bind(Channel channel, String username);
/**
* 解绑会话
* @param channel 哪个 channel 要解绑会话
*/
void unbind(Channel channel);
/**
* 获取属性
* @param channel 哪个 channel
* @param name 属性名
* @return 属性值
*/
Object getAttribute(Channel channel, String name);
/**
* 设置属性
* @param channel 哪个 channel
* @param name 属性名
* @param value 属性值
*/
void setAttribute(Channel channel, String name, Object value);
/**
* 根据用户名获取 channel
* @param username 用户名
* @return channel
*/
Channel getChannel(String username);
}
群聊会话接口
public interface GroupSession {
/**
* 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
* @param name 组名
* @param members 成员
* @return 成功时返回组对象, 失败返回 null
*/
Group createGroup(String name, Set<String> members);
/**
* 加入聊天组
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group joinMember(String name, String member);
/**
* 移除组成员
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeMember(String name, String member);
/**
* 移除聊天组
* @param name 组名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeGroup(String name);
/**
* 获取组成员
* @param name 组名
* @return 成员集合, 如果群不存在或没有成员会返回 empty set
*/
Set<String> getMembers(String name);
/**
* 获取组成员的 channel 集合, 只有在线的 channel 才会返回
* @param name 组名
* @return 成员 channel 集合
*/
List<Channel> getMembersChannel(String name);
/**
* 判断群聊是否一被创建
* @param name 群聊名称
* @return 是否存在
*/
boolean isCreated(String name);
}
整体结构
-
client包:存放客户端相关类
-
message包:存放各种类型的消息
-
protocol包:存放自定义协议
-
server包:存放服务器相关类
- service包:存放用户相关类
- session包:单聊及群聊相关会话类
客户端代码结构
public class ChatClient {
static final Logger log = LoggerFactory.getLogger(ChatClient.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 自定义的协议解码粘半包处理器
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageSharableCodec);
}
});
Channel channel = bootstrap.connect().sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
服务器代码结构
public class ChatServer {
static final Logger log = LoggerFactory.getLogger(ChatServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageSharableCodec);
}
});
Channel channel = bootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
登录
客户端代码
客户端添加如下handler,分别处理登录、聊天等操作
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// 这是一个计数锁,只有当其维护的value减为0的时候才会释放
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 原子变量
AtomicBoolean LOGIN = new AtomicBoolean(false);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
/**
* 创建连接时执行的处理器,用于执行登陆操作
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 开辟额外线程(不要让nio的线程被录入阻塞),用于用户登陆及后续操作
new Thread(()->{
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名");
String username = scanner.next();
System.out.println("请输入密码");
String password = scanner.next();
// 创建包含登录信息的请求体
LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送到channel中,注意这里用ctx写出,因为他要从这里找前面的那些处理器进行加工
ctx.writeAndFlush(message);
// 校验登录结果,如果能获取到锁就说明登录成功
if (!loginStatus.get()) {
// 登陆失败,关闭channel并返回
ctx.channel().close();
return;
}
// 登录成功后,执行其他操作
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = scanner.nextLine();
// 获得指令及其参数,并发送对应类型消息
// 注意这里!!!!!你发送的消息类型决定了在服务器端处理的handeler
String[] commands = command.split(" ");
switch (commands[0]){
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, commands[1], commands[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username,commands[1], commands[2]));
break;
case "gcreate":
// 分割,获得群员名
String[] members = commands[2].split(",");
Set<String> set = new HashSet<>(Arrays.asList(members));
// 把自己加入到群聊中
set.add(username);
ctx.writeAndFlush(new GroupCreateRequestMessage(commands[1],set));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(commands[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, commands[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, commands[1]));
break;
case "quit":
ctx.channel().close();
return;
default:
System.out.println("指令有误,请重新输入");
continue;
}
}
}, "login channel").start();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 注意噢,这个消息的接收是在这里进行输出控制台
log.debug("{}", msg);
if (msg instanceof LoginResponseMessage) {
// 如果是登录响应信息
LoginResponseMessage message = (LoginResponseMessage) msg;
boolean isSuccess = message.isSuccess();
// 登录成功,设置登陆标记
if (isSuccess) {
loginStatus.set(true);
}
// 登陆后,唤醒登陆线程,原始计数为1,减了一个后就变为0,释放锁
waitLogin.countDown();
}
}
});
服务器代码
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
// 日志
ch.pipeline().addLast(LOGGING_HANDLER);
// 自定义的协议编解码操作
ch.pipeline().addLast(MESSAGE_CODEC);
// 只对LoginRequestMessage解码结果进行操作
ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
// 拿着账号密码去后端做校验,校验通过还要把用户名和channel的对
应关系也要存储起来,用来实现单聊的时候看对方在不在线
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if(login) {
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或密码不正确");
}
ctx.writeAndFlush(message);
}
});
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
// 该handler处理登录请求
LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
ch.pipeline().addLast(new LoginRequestMessageHandler());
运行结果
客户端
5665 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317, 1, 1, 1, 0, 279
5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:AbstractResponseMessage{success=true, reason='登陆成功'}
5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='登陆成功'}
success
服务器
11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317, 1, 1, 0, 0, 217
11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:LoginRequestMessage{username='Nyima', password='123'}
7946 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x8e7c07f6, L:/127.0.0.1:8080 - R:/127.0.0.1:60572] WRITE: 295B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4e 59 49 4d 01 01 01 00 00 00 00 ff 00 00 01 17 |NYIM............|
|00000010| ac ed 00 05 73 72 00 31 63 6e 2e 6e 79 69 6d 61 |....sr.1cn.nyima|
|00000020| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
|00000030| 73 61 67 65 2e 4c 6f 67 69 6e 52 65 73 70 6f 6e |sage.LoginRespon|
|00000040| 73 65 4d 65 73 73 61 67 65 e2 34 49 24 72 52 f3 |seMessage.4I$rR.|
|00000050| 07 02 00 00 78 72 00 34 63 6e 2e 6e 79 69 6d 61 |....xr.4cn.nyima|
|00000060| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
|00000070| 73 61 67 65 2e 41 62 73 74 72 61 63 74 52 65 73 |sage.AbstractRes|
|00000080| 70 6f 6e 73 65 4d 65 73 73 61 67 65 b3 7e 19 32 |ponseMessage.~.2|
|00000090| 9b 88 4d 7b 02 00 02 5a 00 07 73 75 63 63 65 73 |..M{...Z..succes|
|000000a0| 73 4c 00 06 72 65 61 73 6f 6e 74 00 12 4c 6a 61 |sL..reasont..Lja|
|000000b0| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 78 |va/lang/String;x|
|000000c0| 72 00 24 63 6e 2e 6e 79 69 6d 61 63 2e 73 74 75 |r.$cn.nyimac.stu|
|000000d0| 64 79 2e 64 61 79 38 2e 6d 65 73 73 61 67 65 2e |dy.day8.message.|
|000000e0| 4d 65 73 73 61 67 65 dd e9 84 b7 21 db 18 52 02 |Message....!..R.|
|000000f0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|00000100| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|00000110| 00 00 00 00 00 00 00 01 74 00 0c e7 99 bb e9 99 |........t.......|
|00000120| 86 e6 88 90 e5 8a 9f |....... |
+--------+-------------------------------------------------+----------------+
单聊
客户端输入send username content
即可发送单聊消息,需要服务器端添加处理ChatRequestMessage的handler
@ChannelHandler.Sharable // 必须添加该注解
// 表明只对ChatRequestMessage的消息进行加工
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
// 获得user所在的channel
Channel channel = SessionFactory.getSession().getChannel(msg.getTo());
// 如果双方都在线
if (channel != null) {
// 通过接收方与服务器之间的channel发送信息,注意,这里不是写到byteBuf去
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
} else {
// 通过发送方与服务器之间的channel发送消息
ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或离线,发送失败"));
}
}
}
// 该handler处理单聊请求
ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
ch.pipeline().addLast(chatRequestMessageHandler);
运行结果
发送方(zhangsan)
send Nyima hello
接收方(Nyima)
// 收到zhangsan发来的消息
20230 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - ChatResponseMessage{from='zhangsan', content='hello'}
群聊
创建群聊
添加处理GroupCreateRequestMessage
的handler
@ChannelHandler.Sharable
// 表明只对GroupCreateRequestMessage的消息进行加工
public class GroupCreateMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
// 获得要创建的群聊名,ctx对应的是发送这个创建群聊的业务请求的人的这个channel
String groupName = msg.getGroupName();
// 获得要创建的群聊的成员组(首次拉起形成群聊的那几个人,包含自身才行)
Set<String> members = msg.getMembers();
// 判断该群聊是否创建过,未创建返回null并创建群聊
Group group = GroupSessionFactory.getGroupSession().createGroup(groupName, members);
if (group == null) {
// 向群的创建者发送创建成功消息
GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(true, groupName + "创建成功");
ctx.writeAndFlush(groupCreateResponseMessage);
// 获得在线群员的channel,给群员发送入群聊消息
List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
groupCreateResponseMessage = new GroupCreateResponseMessage(true, "您已被拉入"+groupName);
// 给每个在线群员发送消息
for(Channel channel : membersChannel) {
channel.writeAndFlush(groupCreateResponseMessage);
}
} else {
// 发送失败消息给创建人
GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(false, groupName + "已存在");
ctx.writeAndFlush(groupCreateResponseMessage);
}
}
}
// 该handler处理创建群聊请求
GroupCreateMessageHandler groupCreateMessageHandler = new GroupCreateMessageHandler();
ch.pipeline().addLast(groupCreateMessageHandler);
运行结果
创建者客户端
// 首次创建
gcreate Netty学习 zhangsan,lisi
31649 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='Netty学习创建成功'}
15244 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}
// 再次创建
gcreate Netty学习 zhangsan,lisi
40771 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='Netty学习已存在'}
群员客户端
28788 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}
群聊聊天
@ChannelHandler.Sharable
// 表明只对GroupChatRequestMessage的消息进行加工
public class GroupChatMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
// 判断群聊是否存在
boolean isCreated = groupSession.isCreated(groupName);
if (isCreated) {
// 给群员发送信息
List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
for(Channel channel : membersChannel) {
channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
}
} else {
ctx.writeAndFlush(new GroupChatResponseMessage(false, "群聊不存在"));
}
}
}
// 该handler处理群聊聊天
GroupChatMessageHandler groupChatMessageHandler = new GroupChatMessageHandler();
ch.pipeline().addLast(groupChatMessageHandler);
运行结果
发送方(群聊存在)
gsend Netty学习 你们好
45408 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan', content='你们好'}
接收方
48082 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan', content='你们好'}
发送方(群聊不存在)
gsend Spring学习 你们好
25140 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='群聊不存在'}
加入群聊
@ChannelHandler.Sharable
public class GroupJoinMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
GroupSession groupSession = GroupSessionFactory.getGroupSession();
// 判断该用户是否在群聊中
Set<String> members = groupSession.getMembers(msg.getGroupName());
boolean joinFlag = false;
// 群聊存在且用户未加入,才能加入
if (!members.contains(msg.getUsername()) && groupSession.isCreated(msg.getGroupName())) {
joinFlag = true;
}
if (joinFlag) {
// 加入群聊
groupSession.joinMember(msg.getGroupName(), msg.getUsername());
ctx.writeAndFlush(new GroupJoinResponseMessage(true,"加入"+msg.getGroupName()+"成功"));
} else {
ctx.writeAndFlush(new GroupJoinResponseMessage(false, "加入失败,群聊未存在或您已加入该群聊"));
}
}
}
// 该handler处理加入群聊
GroupJoinMessageHandler groupJoinMessageHandler = new GroupJoinMessageHandler();
ch.pipeline().addLast(groupJoinMessageHandler);
运行结果
正常加入群聊
94921 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='加入Netty学习成功'}
加入不能存在或已加入的群聊
44025 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='加入失败,群聊未存在或您已加入该群聊'}
退出
@ChannelHandler.Sharable
public class GroupQuitMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
GroupSession groupSession = GroupSessionFactory.getGroupSession();
String groupName = msg.getGroupName();
Set<String> members = groupSession.getMembers(groupName);
String username = msg.getUsername();
// 判断用户是否在群聊中以及群聊是否存在
boolean joinFlag = false;
if (groupSession.isCreated(groupName) && members.contains(username)) {
// 可以退出
joinFlag = true;
}
if (joinFlag) {
// 退出成功
groupSession.removeMember(groupName, username);
ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出"+groupName+"成功"));
} else {
// 退出失败
ctx.writeAndFlush(new GroupQuitResponseMessage(false, "群聊不存在或您未加入该群,退出"+groupName+"失败"));
}
}
}
// 该handler处理退出群聊
GroupQuitMessageHandler groupQuitMessageHandler = new GroupQuitMessageHandler();
ch.pipeline().addLast(groupQuitMessageHandler);
运行结果
正常退出
32282 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='退出Netty学习成功'}
退出不存在或未加入的群聊
67404 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='群聊不存在或您未加入该群,退出Netty失败'}
查看群聊成员
@ChannelHandler.Sharable
public class GroupMembersMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
ctx.writeAndFlush(new GroupMembersResponseMessage(GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName())));
}
}
// 该handler处理查看成员
GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();
ch.pipeline().addLast(groupMembersMessageHandler);
运行结果
46557 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupMembersResponseMessage{members=[zhangsan, Nyima]}
退出聊天室
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
/**
* 断开连接时触发 Inactive事件
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 解绑
SessionFactory.getSession().unbind(ctx.channel());
}
/**
* 异常退出,需要解绑
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 解绑
SessionFactory.getSession().unbind(ctx.channel());
}
}
// 该handler处理退出聊天室
ch.pipeline().addLast(quitHandler);
GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();
退出时,客户端会关闭channel并返回
case "quit":
// 关闭channel并返回
ctx.channel().close();
return;
连接假死
原因
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,会白白地消耗资源
- 应用程序线程阻塞,无法进行数据读写
问题
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
解决方法
可以添加
IdleStateHandler
对空闲时间进行检测,通过构造函数可以传入三个参数
- readerIdleTimeSeconds 读空闲经过的秒数
- writerIdleTimeSeconds 写空闲经过的秒数
- allIdleTimeSeconds 读和写空闲经过的秒数
当指定时间内未发生读或写事件时,会触发特定事件
- 读空闲会触发
READER_IDLE
- 写空闲会触发
WRITE_IDLE
- 读和写空闲会触发
ALL_IDEL
将定时任务的周期设置为 0,这意味着不会触发该空闲状态事件。
想要处理这些事件,需要自定义事件处理函数
服务器端代码
// 用于空闲连接的检测,5s内未读到数据,会触发READ_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// 添加双向处理器,负责处理READER_IDLE事件
/*ChannelDuplexHandler 是 Netty 框架中的一个特殊类,它是用来处理网络通信中的读写事件的双向处理器。它扩展
了ChannelInboundHandler和ChannelOutboundHandler,同时负责处理从网络中读取到的数据以及将数据写入到网络中。*/
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 获得事件
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 断开连接
ctx.channel().close();
}
}
});
- 使用
IdleStateHandler
进行空闲检测 - 使用双向处理器
ChannelDuplexHandler
对入站与出站事件进行处理IdleStateHandler
中的事件为特殊事件,需要实现ChannelDuplexHandler
的userEventTriggered
方法,判断事件类型并自定义处理方式,来对事件进行处理
为避免因非网络等原因引发的WRITER_IDLE事件,比如网络情况良好,只是用户本身没有输入数据,这时发生WRITER_IDLE事件,直接让服务器断开连接是不可取的
为避免此类情况,需要在客户端向服务器发送心跳包,发送频率要小于服务器设置的IdleTimeSeconds
,一般设置为其值的一半
客户端代码
// 发送心跳包,让服务器知道客户端在线
// 3s未发生WRITER_IDLE,就像服务器发送心跳包
// 该值为服务器端设置的READER_IDLE触发时间的一半左右
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
// 发送心跳包
ctx.writeAndFlush(new PingMessage());
}
}
});