写在前面
源码 。
在进行网络编程的时候,不可避免的需要对write出去的消息做一些处理,比如脱敏,增加统一数据等。而netty提供了ChannelOutboundHandler来允许我们拦截
消息从而可以对消息进行处理。对应的接口是io.netty.channel.ChannelHandler
,抽象了通道处理器相关逻辑。
本文就一起来看下这部分内容。
1:定义server
server main:
package com.dahuyou.netty.outboundHandlerAndInboundHandler.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) {
new NettyServer().bing(7397);
}
private void bing(int port) {
//配置服务端NIO线程组
EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
childGroup.shutdownGracefully();
parentGroup.shutdownGracefully();
}
}
}
MyChannelInitializer:
package com.dahuyou.netty.outboundHandlerAndInboundHandler.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
// 基于换行符号
// channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());
}
}
MyServerHandler:
package com.dahuyou.netty.outboundHandlerAndInboundHandler.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("链接报告开始");
System.out.println("链接报告信息:有一客户端链接到本服务端");
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");
//通知客户端链接建立成功
String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
//ctx.writeAndFlush(str);
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息:" + msg);
//通知客户端链消息发送成功
String str = "随机数:" + Math.random() * 10 + "\r\n";
//ctx.writeAndFlush(str);
}
/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n" + cause.getMessage());
}
}
2:定义client
client main:
package com.dahuyou.netty.outboundHandlerAndInboundHandler.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) {
new NettyClient().connect("127.0.0.1", 7397);
}
private void connect(String inetHost, int inetPort) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ, true);
b.handler(new MyChannelInitializer());
ChannelFuture f = b.connect(inetHost, inetPort).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
MyChannelInitializer:
package com.dahuyou.netty.outboundHandlerAndInboundHandler.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 基于换行符号
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyTuoMinOutMsgHandler());//消息出站处理器,脱敏处理
channel.pipeline().addLast(new MyCommonDataOutMsgHandler());//消息出站处理器,增加统一消息
channel.pipeline().addLast(new MyInMsgHandler()); //消息入站处理器
}
}
这里我们定义了两个ChannelOutboundHandler(本文重点!!!)
,分别是负责脱敏的MyTuoMinOutMsgHandler:
package com.dahuyou.netty.outboundHandlerAndInboundHandler.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class MyTuoMinOutMsgHandler extends ChannelOutboundHandlerAdapter {
/*@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("ChannelOutboundHandlerAdapter.read 发来一条消息\r\n");
super.read(ctx);
}*/
@Override // 调用ctx的writeAndFlush方法时会被该方法拦截执行
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("MyOutMsgHandler.write 111111111--脱敏处理");
msg += "--tuomin";
super.write(ctx, msg, promise);
}
}
以及负责增加统一数据的MyCommonDataOutMsgHandler,
package com.dahuyou.netty.outboundHandlerAndInboundHandler.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class MyCommonDataOutMsgHandler extends ChannelOutboundHandlerAdapter {
/*@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("ChannelOutboundHandlerAdapter.read 发来一条消息\r\n");
super.read(ctx);
}*/
@Override // 调用ctx的writeAndFlush方法时会被该方法拦截执行
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("MyOutMsgHandler.write 22222222 --- 增加统一数据");
msg += "-common data";
super.write(ctx, msg, promise);
}
}
先启动server,后启动client,client和server输出: