背景
netty框架中,自定义解码器的起点是ByteBuf类型的消息, 自定义编码器的终点是ByteBuf类型。
1.解码器
业务解码器的起点是ByteBuf类型
netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter使用默认方式(不处理,向下传递事件)实现了所有的Inbound接口。因此,MessageToMessageEncoder只需要重写channelRead方法,并在该方法中提取消息、转换消息、通过ChannelInvoker将转换后的消息以channelRead事件发向pipeline即可。
MessageToMessageEncoder抽象类的实现如下:
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
protected MessageToMessageDecoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
matcher = TypeParameterMatcher.get(inboundMessageType);
}
public boolean acceptInboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
} finally {
out.recycle();
}
}
}
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}
1.1 类型的匹配器
MessageToMessageDecoder内部维护了一个TypeParameterMatcher类型的匹配器对象matcher,用于指定解码器可以处理的消息类型。可通过构造函数为其设置类型,也可通过泛型指定:
// 使用泛型类型
protected MessageToMessageDecoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}
// 子类调用MessageToMessageDecoder构造器时,传入类型
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
matcher = TypeParameterMatcher.get(inboundMessageType);
}
一般,通过泛型指定解码器处理的消息对象,即使用MessageToMessageDecoder的无参构造函数。
acceptInboundMessage方法封装matcher的实现,返回布尔值,表示是否支持处理msg消息类型:
public boolean acceptInboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
根据matcher的match方法:
private static final class ReflectiveMatcher extends TypeParameterMatcher {
private final Class<?> type;
//...
@Override
public boolean match(Object msg) {
// msg消息是否为type类型或者其子类型
return type.isInstance(msg);
}
}
1.2 channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 构造List列表对象,存储解码后的对象
CodecOutputList out = CodecOutputList.newInstance();
try {
// 判断是否支持处理消息
if (acceptInboundMessage(msg)) {
I cast = (I) msg;
try {
// 处理消息,将cast对象解码后的结果存放到out列表中
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
// 不处理消息,以原样保存
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
int size = out.size();
// 遍历列表,依次向pipeline触发解码后的对象
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
} finally {
out.recycle();
}
}
}
逻辑较为清晰:
[1] 构造列表对象out,用于临时存放解码后的消息;
[2] 判断当前解码器是否可以处理该消息,不可以处理,直接添加到out中;可以处理,调用decode方法解码消息,解码结果都添加到out中;
[3] 遍历out列表,将消息以ChannelRead事件传递给向pipeline;
[4] out清理、回收再利用;
1.3 decode方法
decode方法是实际进行消息转换的逻辑,由子类根据业务具体实现:
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
将msg解码,解码后的对象存放在out中;由于out是数组,因此可以从msg中解码出一个对象,也可以解码出多个。如下所示:
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
out.add(msg.toString(charset));
}
将ByteBuf类型的msg消息转为一个String类型的对象;
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
String[] decodedMsgs = msg.toString(charset).split(";");
for (String decodedMsg: decodedMsgs) {
out.add(decodedMsg);
}
}
将ByteBuf转为String,并按照;分隔符进行拆分,每个字符串作为一个消息对象。
2.解码器案例
案例的结构图如下所示,消息流入解码器和流出时的消息类型会发生变化:
引入三个解码器和一个业务Handler:
[1] 编码器1实现ByteBuf->String类型的转换;
[2] 编码器2实现String->Message1类型的转换;
[3] 编码器3实现Message1->Message2类型的转换;
[4] 业务Handler打印消息类型和消息;
实现类依次为:
// MyMessageDecoder1
public class MyMessageDecoder1 extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
out.add(msg.toString(Charset.defaultCharset()));
}
}
// MyMessageDecoder2
class MyMessageDecoder2 extends MessageToMessageDecoder<String> {
@Override
protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) {
String[] decodedMsgs = msg.split(";");
for (String decodedMsg : decodedMsgs) {
out.add(new Message1(decodedMsg));
}
}
}
// MyMessageDecoder3
class MyMessageDecoder3 extends MessageToMessageDecoder<Message1> {
@Override
protected void decode(ChannelHandlerContext ctx, Message1 msg, List<Object> out) {
out.add(new Message2(msg.getContent()));
}
}
业务Handler定义如下:
private static class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
}
}
Message1和Message2消息定义如下:
@Data
@RequiredArgsConstructor
pulic class Message1 {
private final String content;
}
@Data
@RequiredArgsConstructor
pulic class Message2 {
private final String content;
}
客户端发送消息:"test1;test2;test3"时:
Microsoft Telnet> send test1;test2;test3
发送字符串 test1;test2;test3
Microsoft Telnet>
服务器日志如下所示:
Message2(content=test1)
Message2(content=test2)
Message2(content=test3)
注意:解码的顺序沿着pipeline进行,因此需要注意调整netty解码器在pipeline中的位置。
如果将3和解码器2的顺序调整一下:
protected void initChannel(NioSocketChannel channel) {
channel.pipeline().addLast(new MyMessageDecoder1());
channel.pipeline().addLast(new MyMessageDecoder3());
channel.pipeline().addLast(new MyMessageDecoder2());
channel.pipeline().addLast(new MyHandler());
}
重复上述操作,服务器日志如下:
Message1(content=test1)
Message1(content=test2)
Message1(content=test3)
此时,解码器1流出的数据为String类型,流入解码器2时-类型校验不通过直接以流入的String类型流出,流入解码器3时,将String类型转为Message1类型,流入业务Handler进行打印。
3.编码器
业务编码器的终点是ByteBuf类型
netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter,ChannelOutboundHandlerAdapter使用默认方式实现(不处理,向前传递事件)了所有的Outbound接口。因此,MessageToMessageEncoder只需要重写write方法,并在该方法中编码消息、并通过ChannelInvoker将编码后的消息发送到pipeline即可。
MessageToMessageEncoder抽象类的实现如下:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
protected MessageToMessageEncoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
}
protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
matcher = TypeParameterMatcher.get(outboundMessageType);
}
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (out != null) {
try {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
}
} finally {
out.recycle();
}
}
}
}
private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
final ChannelPromise voidPromise = ctx.voidPromise();
for (int i = 0; i < out.size(); i++) {
ctx.write(out.getUnsafe(i), voidPromise);
}
}
private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
for (int i = 0; i < out.size(); i++) {
combiner.add(ctx.write(out.getUnsafe(i)));
}
combiner.finish(promise);
}
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}
3.1 类型的匹配器
MessageToMessageEncoder内部维护了一个TypeParameterMatcher类型的匹配器对象matcher,用于指定该编码器器可以处理的消息类型,与解码器中的matcher作用完全相同,不再赘述。
3.2 write方法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
// 判断当前编码器是否可以编码消息
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
// 编码消息,并将编码后的消息保存到out列表中
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
// 不能编码的消息不处理,直接沿着pipeline向前传递
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
// 遍历out,依次调用ctx.write,沿着pipeline向前传递
if (out != null) {
try {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
}
} finally {
// 清理out列表,回收再利用
out.recycle();
}
}
}
}
逻辑较为清晰:
[1] 构造列表对象out,用于临时存放编码后的消息;
[2] 判断当前编码器是否可以处理该消息,不可以处理,直接通过ctx.write沿着pipeline向前传递;可以处理,调用encode方法编码消息,编码结果添加到out中;
[3] 遍历out列表,将消息以write事件传递给向pipeline;
[4] out清理、回收再利用;
3.2 encode方法
encode方法是实际进行消息转换的逻辑,由子类根据业务具体实现:
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
将msg消息进行编码,编码后的对象存放在out中;由于out是数组,因此可以从msg中编码出一个对象,也可以编码出多个,与解码器逻辑相同。
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
out.add(msg.toString(charset));
}
将ByteBuf类型的msg消息转为一个String类型的对象;
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
String[] decodedMsgs = msg.toString(charset).split(";");
for (String decodedMsg: decodedMsgs) {
out.add(decodedMsg);
}
}
将ByteBuf转为String,并按照;分隔符进行拆分,每个字符串作为一个消息对象。
netty向外发送数据时,一般经过业务Handler->编码器->HeadContext的流程。
向客户端发送消息的底层实现在HeadContext的unsafe对象(NioSocketChannel的unsafe对象)中,而发送前有消息类型判断:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler{
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
}
unsafe对象的write方法如下:
public final void write(Object msg, ChannelPromise promise) {
//...
msg = filterOutboundMessage(msg);
//...
}
在真实写操作前,通过filterOutboundMessage进行消息类型的判断:
@Override
protected final Object filterOutboundMessage(Object msg) {
// 要求消息必须时ByteBuf或者FileRegion类型或其子类型
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
由此,编码器将消息传递给HeadContext前,需要将消息最终编码为ByteBuf类型。
4.解码器案例
案例结构图如下所示:
在章节2中的案例基础上新增两个编码器,并修改业务Handler:
[1] 业务Handler,接收客户端消息后,响应相同消息;
[2] 编码器1:将Message2类型的消息转为String类型;
[3] 编码器2: 将String类型消息转为ByteBuf类型;
代码实现如下:
修改业务Handler:
private static class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg);
// 新增逻辑“将消息对象发送给客户端
ctx.write(msg);
}
}
添加编码器:
// 将Message2消息转为String消息
public class MyEncoder1 extends MessageToMessageEncoder<Message2> {
@Override
protected void encode(ChannelHandlerContext ctx, Message2 msg, List<Object> out) throws Exception {
out.add(msg.getContent());
}
}
// 将String消息转为ByteBuf消息
public class MyEncoder2 extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) {
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), Charset.defaultCharset()));
}
}
在MyHandler前依次添加解码器MyEncoder2和MyEncoder1:
protected void initChannel(NioSocketChannel channel) {
channel.pipeline().addLast(new MyMessageDecoder1());
channel.pipeline().addLast(new MyMessageDecoder2());
channel.pipeline().addLast(new MyMessageDecoder3());
channel.pipeline().addLast(new MyEncoder2());
channel.pipeline().addLast(new MyEncoder1());
channel.pipeline().addLast(new MyHandler());
}
可以使用Netty写一个客户端, 也可用客户端工具模拟,这里为了方便,使用SocketTool.exe,控制台日志如下:
14:36:15 发送数据:test1;test2;test3[1次]
14:36:15 收到数据:test1test2test3
注意:客户端收到了test1test2test3消息,在客户端开来是一个消息,但在服务器看来是连续发送的3个消息,消息内容分别为test1和test2和test3。这是TCP的流传输模式导致,可在业务层添加额外处理解决这个问题。将在下一篇文件介绍Netty如何处理粘包和分包问题。