Netty
- 自定义消息协议的实现逻辑
- 自定义编码器
- 心跳机制
- 实现客户端发送心跳包
自定义消息协议的实现逻辑
消息协议:这一次消息需要包含两个部分,即消息长度和消息内容本身。
自定义消息编码器︰消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
自定义消息解码器∶消息解码器根据消息协议的消息长度,来获得指定长度的消息内容。
自定义编码器
自定义消息协议:
//自定义消息协议
public class MessageProtocal {
//消息的长度
private int length;
//消息的内容
private byte[] content;
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
客户端基本代码
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
//设置相关的参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加处理器,分包编码器
pipeline.addLast(new MessageEncoder());
//添加具体的业务处理器
pipeline.addLast(new NettyMessageClientHandler());
}
});
System.out.println("客户端启动了");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
客户端业务代码
public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocal> {
//连接通道创建后要向服务端发送消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<200;i++){
String msg = "西安科技大学";
//创建消息协议对象
MessageProtocal messageProtocal = new MessageProtocal();
messageProtocal.setLength(msg.getBytes(StandardCharsets.UTF_8).length);
messageProtocal.setContent(msg.getBytes(StandardCharsets.UTF_8));
//发送协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据
ctx.writeAndFlush(messageProtocal);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {
}
}
自定义编码器
public class MessageEncoder extends MessageToByteEncoder<MessageProtocal> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocal msg, ByteBuf out) throws Exception {
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent());
}
}
服务端基本代码
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup boosGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加解码器
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new NettyMessageServerHandler());
}
});
System.out.println("Netty的服务端启动了");
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
channelFuture.channel().closeFuture().sync();
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
自定义解码器
//自定义解码器代码
public class MessageDecoder extends ByteToMessageDecoder {
int length = 0;
//ctx
//in:客户端发送来的MessageProtocol编码后的ByteBuf数据
//out:out里的数据会被放行到下一个handler把解码出来的MessageProtocol放到out里面
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("ByteBuf:"+in);
//获得前面的4个字节的数据 == 描述实际内容的长度
if(in.readableBytes()>=4){
//ByteBuf里面可能有MessageProtocol数据
if(length==0){
length = in.readInt();
}
//length = 15
if(in.readableBytes()<length){
//说明数据还没到齐,等待下一次调用decode
System.out.println("当前数据量不够,继续等待");
return;
}
//可读数据量>=length ==> 意味着这一次的MessageProtocol的内容已经到齐了
//创建了一个指定length长度的字节数组
byte[] content = new byte[length];
//把ByteBuf里面的指定长度的数据读到content数组中
in.readBytes(content);
//创建协议MessageProtocol对象赋值
MessageProtocal messageProtocal = new MessageProtocal();
messageProtocal.setLength(length);
messageProtocal.setContent(content);
out.add(messageProtocal);
length=0;
}
}
}
服务端业务处理代码
public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocal> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {
System.out.println("---服务器收到的数据---");
System.out.println("消息的长度:"+msg.getLength());
System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));
}
}
运行结果:
心跳机制
在分布式系统中,心跳机制常常在注册中心组件中提及,比如Zookeeper、Eureka、Nacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设的条件时,服务端将释放客户端的连接资源。
试想一下,当我们一个用来写数据的通道,它虽然没有下线,但这个通道长时间都不写数据了,是不是我们可以利用心跳机制,关闭此类通道及其对应的客户端
实现客户端发送心跳包
客户端基本代码
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("客户端启动了");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9090).sync();
//模拟向服务端发送心跳数据
String packet = "heartbeat packet";
Random random = new Random();
Channel channel = channelFuture.channel();
while (channel.isActive()){
//随机的事件来实现时间间隔等待
int num = random.nextInt(10);
Thread.sleep(num*1000);
channel.writeAndFlush(packet);
}
group.shutdownGracefully();
}
}
客户端拦截器
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("客户端收到的数据"+s);
}
}
IdleStateHandler类描述三种空闲状态
读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
写空闲︰在指定时间间隔内没有数据写入到Channel中,将会创建状态为WRITER_IDLE的ldleStateEvent对象。
读写空闲:在指定时间间隔内Channel中没有发生读写操作,将会创建状态为ALL_IDLE的ldleStateEvent对象。
服务端基本代码
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//超时状态处理器会在服务端发现有超过3秒没有没有发生读操作的话会触发超时事件
//创建出IdleStateEvent对象,将该对象交给下一个Handler
pipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
//HeartbeatServerHandler必领重写userEventTriggered方法,用来做具体的超时的业务处理
pipeline.addLast(new HeartbeatServerHandler());
}
});
System.out.println("Netty服务端启动了");
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
服务端业务代码
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("服务端收到的心跳"+s);
channelHandlerContext.writeAndFlush("服务端已经收到了心跳");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent)evt;
switch (event.state()){
case READER_IDLE:
readIdleTimes++;
break;
case WRITER_IDLE:
System.out.println("写超时");
break;
case ALL_IDLE:
System.out.println("读写超时");
break;
}
if(readIdleTimes>3){
System.out.println("读超时超过三次,关闭连接");
ctx.writeAndFlush("超时关闭");
ctx.channel().close();
}
}
}