说明
- io.netty.handler.codec.ByteToMessageDecoder是一个解码器,从字节数据转换为其它类型的数据。
- ByteToMessageDecoder内部有一个累加器,将收到的字节读出来累积到一个ByteBuf中。
- ByteToMessageDecoder是个抽象类型,其中抽象函数decode(ChannelHandlerContext ctx, ByteBuf in, List out)必须由子类来实现。怎么将字节流转换为其它类型的数据由子类在这个函数中实现。所以,从本质上说,ByteToMessageDecoder并不知道怎么转换数据,必须由子类来实现转换的逻辑。它也不知道一条完整的报文应该包含多少字节。
- ByteToMessageDecoder还会将decode(ChannelHandlerContext ctx, ByteBuf in, List out)转换以后的数据(放在参数out中)通过ChannelHandlerContext的函数fireChannelRead(Object msg)进行触发,传递给ChannelPipeline中后续的ChannelHandler。
示例
服务端代码片段
package com.thb.power.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* 服务端的主函数
* @author thb
*
*/
public class MainStation {
static final int PORT = Integer.parseInt(System.getProperty("port", "22335"));
public static void main(String[] args) throws Exception {
// 配置服务器
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MainStationInitializer());
// 启动服务端
ChannelFuture f = b.bind(PORT).sync();
// 等待直到server socket关闭
f.channel().closeFuture().sync();
} finally {
// 关闭所有event loops以便终止所有的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.thb.power.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class MainStationInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ServerHandler());
}
}
package com.thb.power.server;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public class ServerHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
System.out.println("ServerHandler.decode->readableBytes: " + in.readableBytes());
if (in.readableBytes() < 12) {
return;
}
out.add(in.readBytes(12));
}
}
运行输出
启动服务端,然后再启动客户端,从客户端发送12个字节的报文数据。
客户端的窗口,发送了12个字节的数据:
服务端的窗口,接收到了12个字节的数据: