说明
- io.netty.handler.codec.ReplayingDecoder是io.netty.handler.codec.ByteToMessageDecoder的一个子类,是一个抽象类,它将字节流解码成其它的消息。
- 需要ReplayingDecoder的子类实现decode(ChannelHandlerContext ctx, ByteBuf in, List out)这个函数,进行具体的解码。
- ReplayingDecoder和ByteToMessageDecoder最大的不同是它允许子类实现函数decode() 和decodeLast()的时候,就好象需要的字节已经全部接收到一样,而不需要显式判断需要的字节是否已经全部接收到。
例如,如果直接继承ByteToMessageDecoder类,decode() 函数的实现会类似下面的形式:
public class ServerBytesFramerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 12) {
return;
}
out.add(in.readBytes(12));
}
}
而如果直接继承ReplayingDecoder类,decode() 函数的实现会类似下面的形式:
public class ServerBytesFramerDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(12));
}
}
- ReplayingDecoder之所以能做到上面那样,是因为ReplayingDecoder传递了一个特别的ByteBuf实现类ReplayingDecoderByteBuf:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class, "REPLAY");
private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf();
private S state;
private int checkpoint = -1;
final class ReplayingDecoderByteBuf extends ByteBuf {
private static final Signal REPLAY = ReplayingDecoder.REPLAY;
private ByteBuf buffer;
private boolean terminated;
private SwappedByteBuf swapped;
- 当没有足够的字节的时候,ReplayingDecoderByteBuf会抛出一个Error类型,控制又回到ReplayingDecoder。这个Error类型是Signal:
public final class Signal extends Error implements Constant<Signal> {
private static final long serialVersionUID = -221145131122459977L;
- 当ReplayingDecoder捕获到Error后,会将buffer的readerIndex重置回初始的位置(也就是buffer的开始),当新的数据被接收到buffer以后,又会调用decode函数。
示例
一个简单的正常示例
本示例的验证场景:
在这个示例中,客户端发送了12个字节的数据,服务端的ServerBytesFramerDecoder解析成功,传递给了后续的ServerRegisterRequestHandler。
服务端代码片段
package com.thb.power.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* 服务端的主函数
* @author thb
*
*/
public class MainStation {
static final int PORT = Integer.parseInt(System.getProperty("port", "22335"));
public static void main(String[] args) throws Exception {
// 配置服务器
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MainStationInitializer());
// 启动服务端
ChannelFuture f = b.bind(PORT).sync();
// 等待直到server socket关闭
f.channel().closeFuture().sync();
} finally {
// 关闭所有event loops以便终止所有的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.thb.power.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class MainStationInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ServerBytesFramerDecoder());
p.addLast(new ServerRegisterRequestHandler());
}
}
package com.thb.power.server;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
public class ServerBytesFramerDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(12));
}
}
package com.thb.power.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerRegisterRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf)msg;
System.out.println("ServerRegisterRequestHandler: readableBytes: " + m.readableBytes());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
启动服务端
启动客户端
从客户端发送12个字节的业务数据
观察服务端的输出
从上面服务端的输出可以看出,ServerRegisterRequestHandler收到了12个字节的数据。而ServerRegisterRequestHandler在ServerBytesFramerDecoder的后面,所以这12个字节的数据是ServerBytesFramerDecoder解析出来传递过来的。当时在ChannelPipeline添加的ChannelHandler的顺序:
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ServerBytesFramerDecoder());
p.addLast(new ServerRegisterRequestHandler());
}
客户端发送的数据少于服务端ReplayingDecoder实现类要求接收的数据
本示例的验证场景:
在这个示例中,客户端发送了12个字节的数据,服务端的ServerBytesFramerDecoder要求接收100个字节的数据,所以没有接收到足够的数据,导致后续的ServerRegisterRequestHandler没有接收到数据。
服务端代码片段
package com.thb.power.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* 服务端的主函数
* @author thb
*
*/
public class MainStation {
static final int PORT = Integer.parseInt(System.getProperty("port", "22335"));
public static void main(String[] args) throws Exception {
// 配置服务器
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MainStationInitializer());
// 启动服务端
ChannelFuture f = b.bind(PORT).sync();
// 等待直到server socket关闭
f.channel().closeFuture().sync();
} finally {
// 关闭所有event loops以便终止所有的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.thb.power.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class MainStationInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ServerBytesFramerDecoder());
p.addLast(new ServerRegisterRequestHandler());
}
}
package com.thb.power.server;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
public class ServerBytesFramerDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//out.add(in.readBytes(12));
out.add(in.readBytes(100));
}
}
package com.thb.power.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerRegisterRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf)msg;
System.out.println("ServerRegisterRequestHandler: readableBytes: " + m.readableBytes());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
启动服务端
启动客户端,并发送12个字节的数据
观察服务端的输出
从上面服务端的输出可以发现,LoggingHandler是收到了12个字节的数据,但ServerRegisterRequestHandler没有接收到数据。这是因为ServerBytesFramerDecoder没有接收到足够的数据(期望接收100个字节),也就没有传递给后续的ServerRegisterRequestHandler。