文章目录
- 前言
- 一、协议设计
- 1. 数据格式
- 2. 消息长度
- 3. 编码方式
- 4. 错误处理
- 5. 安全性
- 二、协议解析
- 1. 消息分隔
- 2. 粘包与半包处理
- 3. 校验机制
- 三、为什么需要协议?
- 四、redis 协议
- 五、HTTP 协议
- 六、自定义协议要素
- 编解码器
- 💡 什么时候可以加 @Sharable
前言
在网络通信中,设计和解析网络协议是确保数据可靠传输的关键环节。网络协议定义了数据在网络中传输时必须遵循的规则和格式,包括数据的组织方式、传输方式以及如何处理错误和异常情况。
一、协议设计
1. 数据格式
- 消息头:通常包含消息的元数据,如消息长度、消息类型、源地址、目标地址等。
- 消息体:包含实际的数据内容。
- 校验码:可选字段,用于检测传输过程中的数据完整性,如CRC校验码。
2. 消息长度
- 定长消息:所有消息都具有固定的长度,易于解析但不够灵活。
- 变长消息:消息长度可变,需要在消息头中指定消息长度,以便接收方正确解析。
3. 编码方式
- 文本编码:如JSON、XML等,易于阅读和调试,但解析效率较低。
- 二进制编码:如Protocol Buffers、Thrift等,解析效率高,但可读性较差。
4. 错误处理
- 重传机制:当数据包丢失或损坏时,请求重新发送。
- 超时机制:设置合理的超时时间,超过时间未收到确认则重传。
- 错误码:定义错误码以标识不同的错误类型,便于错误处理。
5. 安全性
- 加密:使用SSL/TLS等协议加密传输的数据。
- 认证:确保消息来源的真实性和合法性。
二、协议解析
1. 消息分隔
- 定长分隔:根据固定的长度来区分消息。
- 特殊分隔符:如换行符(\n)、定界符等。
- 消息头:根据消息头中指定的长度来区分消息。
2. 粘包与半包处理
- 粘包:多个消息合并成一个数据包,需要根据消息头或定界符来识别消息边界。
- 半包:一个消息被分割成多个数据包,需要累积接收到的所有片段才能构成完整消息。
3. 校验机制
- CRC校验:接收方计算接收到的消息的CRC值并与消息中携带的CRC值比较,确保数据完整性。
- MD5/SHA哈希:对于大文件传输,使用哈希值校验数据完整性
三、为什么需要协议?
TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
例如:在网络上传输 :
下雨天留客天留我不留
是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性
一种解读
下雨天留客,天留,我不留
另一种解读
下雨天,留客天,留我不?留
如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用
定长字节表示内容长度 + 实际内容
例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了
0f下雨天留客06天留09我不留
四、redis 协议
使用redis 协议举例
如发送命令:set key value
- redis要求先发送 * 加 数组的长度/个数(redis协议需要将set、key、value放到一个数组中)
- 要求发送$ 加 每个(命令/键值)的长度
- 要求多个部分之间要用回车换行分隔
如:set name zhangsan
*3
$3
set
$4
name
$8
zhangsan
@Slf4j
public class Test01Redis {
public static void main(String[] args) {
final byte[] LINE = {13, 10};//13.回车 10.换行
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//channelActive:连接建立发送命令
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);//LINE:回车换行
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$4".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("name".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$8".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("zhangsan".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
//接受redis返回的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("channelRead");
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
运行以上代码后即可在redis中查询到对应的kv数据
五、HTTP 协议
在拿http 协议举例
@Slf4j
public class Test02Http {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
// ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.debug("{}", msg.getClass());
//
// if (msg instanceof HttpRequest) { // 请求行,请求头
//
// } else if (msg instanceof HttpContent) { //请求体
//
// }
// }
// });
//现在只处理HttpRequest类型的消息(选择处理),HttpContent这里会选择跳过
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
// 获取请求
log.debug(msg.uri());
// 返回响应
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>Hello, world!</h1>".getBytes();
//设置响应体长度
response.headers().setInt(CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
启动以上代码后发送http请求
六、自定义协议要素
消息内容:
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊… 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
//@ChannelHandler.Sharable//表示可以多个channel共享当前Handler
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
System.out.println("执行MessageCodec.encode()");
// 1. 4 字节的魔数(暗号)
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1(自定义约定)
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节的请求序号
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充(使满足2的n次方倍)
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);//将对象转化为二进制字节数组
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("执行MessageCodec.decode()");
int magicNum = in.readInt();//read会改变读指针,get只根据索引找
byte version = in.readByte();
byte serializerType = in.readByte();//字节的序列化方式
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();//对齐填充
int length = in.readInt();//长度
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
if(serializerType == 0){//如果是jdk
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
//注意这里magicNum显示为16909060,因为十六进制的01020304转为是十进制,为16909060
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);//netty约定需要将解码后的结果放到参数中,不然下一个header将无法拿到解码后的结果
}
}
}
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* 消息基类
*/
@Data
public abstract class Message implements Serializable {
private int sequenceId;
private int messageType;
public abstract int getMessageType();
//登录请求消息
public static final int LoginRequestMessage = 0;
private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
}
}
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
测试类
public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
//当将s1发送给解码器时,只拿到了前100个字节,此时发现不是一个完整的消息,不会继续传递给下面的handler
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
channel.writeOutbound(message);//消息出战时会经过MessageCodec(),此时会运行MessageCodec()的encode
Thread.sleep(1000);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);//让ByteBuf有数据,模拟入站
//模拟半包,将buf切成两个
// ByteBuf s1 = buf.slice(0, 100);
// channel.writeInbound(s1);
ByteBuf s1 = buf.slice(0, 100);
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
s1.retain(); //引用计数++ --> 引用计数=2
System.out.println("接受s1");
channel.writeInbound(s1); // release 1
Thread.sleep(2000);
System.out.println("接受s2");
//注意:这里如果没有s1.retain()会报错:IllegalReferenceCountException: refCnt: 0, decrement: 1
//原因是slice()只是逻辑切割,物理上buf、s1、s2是公用一块内存,
// 而当运行channel.writeInbound(s1)时,会将s1的引用计数减为0(表示buf、s1、s2公用的内存被释放掉了),
// 所以运行channel.writeInbound(s2)时会报错
channel.writeInbound(s2);
}
}
输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 c6 |................|
|00000010| ac ed 00 05 73 72 00 25 63 6e 2e 69 74 63 61 73 |....sr.%cn.itcas|
|00000020| 74 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 69 6e 52 |t.message.LoginR|
|00000030| 65 71 75 65 73 74 4d 65 73 73 61 67 65 a0 3f 71 |equestMessage.?q|
|00000040| cb 31 45 b5 88 02 00 02 4c 00 08 70 61 73 73 77 |.1E.....L..passw|
|00000050| 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c 61 6e 67 |ordt..Ljava/lang|
|00000060| 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 65 72 6e |/String;L..usern|
|00000070| 61 6d 65 71 00 7e 00 01 78 72 00 19 63 6e 2e 69 |ameq.~..xr..cn.i|
|00000080| 74 63 61 73 74 2e 6d 65 73 73 61 67 65 2e 4d 65 |tcast.message.Me|
|00000090| 73 73 61 67 65 3d dd 19 a0 bc 07 47 cb 02 00 02 |ssage=.....G....|
|000000a0| 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 |I..messageTypeI.|
|000000b0| 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 |.sequenceIdxp...|
|000000c0| 00 00 00 00 00 74 00 03 31 32 33 74 00 08 7a 68 |.....t..123t..zh|
|000000d0| 61 6e 67 73 61 6e |angsan |
+--------+-------------------------------------------------+----------------+
通过以上打印的日志可以看出:
💡 什么时候可以加 @Sharable
- 当 handler 不保存状态(如多线程情况下半包问题)时,就可以安全地在多线程下被共享
- 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
- 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
在多线程情况下半包问题,A线程接受到的消息只发送了一半,B线程使用了另外一个channel,接受到了一个半包,这时候LengthFieldBasedFrameDecoder就会将A线程和B线程接受的数据拼接在一起。
所以当一个handler只要记录了多次消息之间的状态,就是线程不安全的。不能在多线程下同时使用这个handler(如LengthFieldBasedFrameDecoder)
只有沾包半包处理器需要每次创建新的对象,不能和其他channel共享。
其他的处理器如果没有在多个事件中共享的数据,如果没有则可以和其他channel共享。