写在前面
源码 。
本文看下netty如何自定义编解码器。为此netty专门定义抽象类io.netty.handler.codec.MessageToByteEncoder
和io.netty.handler.codec.ByteToMessageDecoder
,后续我们实现自定义的编解码器就继承这两个类来做。
1:正戏
server 启动类:
package com.dahuyou.netty.customercodec.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) {
new NettyServer().bing(7397);
}
private void bing(int port) {
//配置服务端NIO线程组
EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
childGroup.shutdownGracefully();
parentGroup.shutdownGracefully();
}
}
}
MyChannelInitializer类:
package com.dahuyou.netty.customercodec.server;
import com.dahuyou.netty.customercodec.codec.MyDecoder;
import com.dahuyou.netty.customercodec.codec.MyEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
/*
*/
/*System.out.println("远端连接初始化,IP:" + channel.remoteAddress().getHostString() + ", port:" + channel.remoteAddress().getPort());*//*
*/
/* 解码器 *//*
// 基于换行符号,基于换行符来分割字符串???
// channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 基于指定字符串【换行符,这样功能等同于LineBasedFrameDecoder】
// e.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, false, Delimiters.lineDelimiter()));
// 基于最大长度
// e.pipeline().addLast(new FixedLengthFrameDecoder(4));
// 解码转String,注意调整自己的编码格式GBK、UTF-8 byte->string
channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
// 增加内置编码器,这样向对端发送消息就不要转换为二进制数据了
channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
//在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());
\*/
// 这里我们就不使用string的decoder和encoder了,而是使用自定义的编解码器
// 解码器对应的抽象类是:ByteToMessageDecoder
// 编码器对应的抽象类是:MessageToByteEncoder
//自定义解码器
channel.pipeline().addLast(new MyDecoder());
//自定义编码器
channel.pipeline().addLast(new MyEncoder());
//在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());
}
}
其中的MyDecoder,MyEncoder就是需要我们自己的定义了编解码器了,如下:
package com.dahuyou.netty.customercodec.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.nio.charset.Charset;
import java.util.List;
/**
* 自定义的解码器
*/
//public class MyDecoder extends ByteToMessageDecoder<String> {
public class MyDecoder extends ByteToMessageDecoder {
int BASE_LENGTH = 4;
/*
一个完整包的开始标记
*/
private final byte START_LABEL = 0x02;
/*
一个完整包的结束标记
*/
private final byte END_LABEL = 0x03;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//基础长度不足,我们设定基础长度为4
if (in.readableBytes() < BASE_LENGTH) {
return;
}
int beginIdx; //记录包头位置
while (true) {
// 获取包头开始的index
beginIdx = in.readerIndex();
// 标记包头开始的index
in.markReaderIndex();
// 读到了协议的开始标志,结束while循环
// if (in.readByte() == 0x02) {
if (in.readByte() == START_LABEL) {
break;
}
// 未读到包头,略过一个字节
// 每次略过,一个字节,去读取,包头信息的开始标记
in.resetReaderIndex();
in.readByte();
// 当略过,一个字节之后,
// 数据包的长度,又变得不满足
// 此时,应该结束。等待后面的数据到达
if (in.readableBytes() < BASE_LENGTH) {
return;
}
}
//剩余长度不足可读取数量[没有内容长度位]
int readableCount = in.readableBytes();
if (readableCount <= 1) {
in.readerIndex(beginIdx);
return;
}
//长度域占4字节,读取int
ByteBuf byteBuf = in.readBytes(1);
String msgLengthStr = byteBuf.toString(Charset.forName("GBK"));
int msgLength = Integer.parseInt(msgLengthStr);
//剩余长度不足可读取数量[没有结尾标识]
readableCount = in.readableBytes();
if (readableCount < msgLength + 1) {
in.readerIndex(beginIdx);
return;
}
ByteBuf msgContent = in.readBytes(msgLength);
//如果没有结尾标识,还原指针位置[其他标识结尾]
byte end = in.readByte();
// if (end != 0x03) {
if (end != END_LABEL) {
in.readerIndex(beginIdx);
return;
}
out.add(msgContent.toString(Charset.forName("GBK")));
}
/*
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
}*/
}
解码器定义了这样的规则,ASCII码02
作为开始字符,ASCII码03
作为结束字符,这两个字符都是不可见的控制字符,分别是STX(start of text),ETX(end of text),如下是对stx比较专业的描述:
STX 是 ASCII(American Standard Code for Information Interchange,美国信息交换标准代码)中定义的一个控制字符,其代表“Start of Text”,即正文开始。在数据传输或文本通信中,STX 字符用于标记一个结构化数据块(如纯文本消息)的起始点。它的十进制 ASCII 值是 2,十六进制值是 0x02。
二者对应的ASCII码为:
然后第三个字符作为长度,之后就是根据解析到的长度来读取数据了,并且最后验证读取数据后的下一个字符是03
。最终将数据转换为人类可读的字符串,如下图就是一个可被该解码器解析的例子:
即开始字符 数据长度 数据 结束字符
。
package com.dahuyou.netty.customercodec.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
//public class MyEncoder extends MessageToByteEncoder<String> {
public class MyEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
String msg = in.toString();
byte[] bytes = msg.getBytes();
byte[] send = new byte[bytes.length + 2];
System.arraycopy(bytes, 0, send, 1, bytes.length);
send[0] = 0x02;
send[send.length - 1] = 0x03;
out.writeInt(send.length);
out.writeBytes(send);
}
}
这里解码器可以暂时不用太关注,只是在数据的开头和结尾增加了STX和ETX,如下:
最后MyServerHandler的代码就比较简单了,如下:
package com.dahuyou.netty.customercodec.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 通道激活
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("远端连接初始化,IP:" + channel.remoteAddress().getHostString() + ", port:" + channel.remoteAddress().getPort());
// 通知客户端链接建立成功
String str = "notify channel build success: " + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
ctx.writeAndFlush(str);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 因为在初始化是已经设置了字符串的解码器,所以就不需要上述的手动读取二进制字节码的操作了,这就是框架的好处咯!
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
// 通知客户端链消息发送成功
String str = "server received your msg: " + new Date() + " " + msg + "\r\n";
ctx.writeAndFlush(str);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n" + cause.getMessage());
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
}
}
启动server,简单起见我们直接使用netassit测试:
以上代码相当于发送了byte数组[stx,4,‘a’.‘1’,‘b’,‘2’,etx]。
当然除了使用工具测试外,我们也可以程序来测试,主要是如下代码(详细看源码):
package com.dahuyou.netty.customercodec.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道建立成功");
// 通知客户端链接建立成功
/*
String str = "hello server , i'm client very happy connect to you." + " " + new Date() + " " + ((SocketChannel) ctx.channel()).localAddress().getHostString() + "\r\n";
*/
String xxx = "a1b2";
System.out.println(xxx.getBytes());
byte[] bytes = xxx.getBytes();
// 增加stx length etx
byte[] send = new byte[bytes.length + 3];
System.arraycopy(bytes, 0, send, 2, bytes.length);
send[0] = 0x02; // stx
send[1] = 0X34; // length 4的ASCII码是52 十六进制就是34
send[send.length - 1] = 0x03; // etx
ByteBuf buf = Unpooled.buffer(send.length/* + 3*/); // + 3 stx 长度 etx
buf.writeBytes(send);
// ctx.writeAndFlush(str);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " receive msg: " + msg);
//通知客户端链消息发送成功
// String str = "hello server received your msg: " + new Date() + " " + msg + "\r\n";
// ctx.writeAndFlush(str);
}
}
即channelActive中的逻辑,注意这里写入的都是ASCII码值。
写在后面
参考文章列表
Ascii完整码表(256个) 。
工具 › 开发类 › 进制转换 。
win的netassist TCP测试工具和Linux的nc工具使用 。