Netty 传输Java对象
- 介绍
- 业务场景模拟
- 流程图
- 代码展示
- 订购采购消息 POJO 类
- 订购应答消息 POJO 类
- 服务端启动类 SubscribeReqServer
- 服务端业务处理类 SubscribeServerHandler
- 客户端启动类 SubscribeClient
- 客户端 业务处理类 SubscribeClientHandler
- 效果展示
- 服务端打印
- 客户端打印
- 总结
介绍
我们Java 程序员接触到的第一种序列化或者编码技术 应该 就是Java的默认序列化,只需要序列化的POJO对象实现 java.io.Serializable接口,根据实际情况生成序列 serialVersionUID ,这个类就能通过 java.io.ObjectInput和 java.io.ObjectOutput序列化和反序列化。但是jdk自带的序列化和反序列化 的性能非常差,而且不支持夸语言传输,所以很多场景我们都不使用它。现在让我们利用netty 来实现 java对象的序列化吧。
业务场景模拟
流程图
(1)Netty 服务端接收客户端的用户订购请求信息,消息定义如下
(2)服务端接收到请求消息,对用户名进行合法性校验,如果合法,则构造订购成功的应答消息给客户端,订购应答消息定义如下:
代码展示
订购采购消息 POJO 类
public class SubscribeReq implements Serializable {
private static final long serialVersionUID=1L;
//订购编号
private int subReqID;
//用户名
private String userName;
//订购的产品名称
private String productName;
//订购者的电话号码
private String phoneNumber;
//订购者的家庭地址
private String address;
public int getSubReqID() {
return subReqID;
}
public void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String toString(){
return "SubscribeReq [ subReqID = "+subReqID+" , userName = " +
""+userName+" , productName = "+productName+" ," +
" phoneNumber = "+phoneNumber+" , address = "+address+" ]";
}
}
订购应答消息 POJO 类
public class SubscribeResp implements Serializable {
private static final long serialVersionUID=1L;
//订购编号
private int subReqID;
//订购结果 0 表示成功
private int respCode;
//可选的详细描述信息
private String desc;
public int getSubReqID() {
return subReqID;
}
public void setSubReqID(int subReqID) {
this.subReqID = subReqID;
}
public int getRespCode() {
return respCode;
}
public void setRespCode(int respCode) {
this.respCode = respCode;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public String toString(){
return "SubscribeResp [ subReqID = "+subReqID+" " +
", respCode = "+respCode+" , desc = "+desc+" ]";
}
}
服务端启动类 SubscribeReqServer
public class SubscribeReqServer {
public void bind(int port){
//配置服务端的NIO线程组
EventLoopGroup parentGroup=new NioEventLoopGroup();
EventLoopGroup childGroup=new NioEventLoopGroup();
try {
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(parentGroup,childGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
// ObjectDecoder 负责对实现 Serializable 的POJO对象进行解码
// 有多个构造函数,支持不同的ClassResolver,在此我们使用 weakCachingConcurrentResolver
//来创建线程安全的WeakReferenceMap对类加载器进行缓存,支持多线程并发访问
//为了防止异常码流和解码错位导致的内存溢出,这里将单个对象最大序列化后的字节数组长度设置为1M
socketChannel.pipeline()
.addLast(new ObjectDecoder(1024*1024,
ClassResolvers.weakCachingConcurrentResolver(
this.getClass().getClassLoader())));
// ObjectEncoder,它可以在消息发送的时候自动将实现了Serializable 接口的
//POJO对象进行编码,因此用户无需亲自对对象进行手工序列化,只需要关注自己的业务逻辑即可
socketChannel.pipeline()
.addLast(new ObjectEncoder());
socketChannel.pipeline()
.addLast(new SubscribeServerHandler());
}
});
ChannelFuture future=bootstrap.bind(port).sync();
System.out.println("netty server is started");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//优雅退出,释放线程池资源
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new SubscribeReqServer().bind(8080);
}
}
服务端业务处理类 SubscribeServerHandler
public class SubscribeServerHandler extends ChannelHandlerAdapter {
public void channelRead(ChannelHandlerContext context,Object obj){
SubscribeReq subscribeReq=(SubscribeReq)obj;
//对合法性进行交易,当前只校验userName属性。如果符合,将信息发送给客户端
if ("echo".equals(subscribeReq.getUserName())){
System.out.println("Service accept client subscribe req : "+subscribeReq);
context.writeAndFlush(resp(subscribeReq.getSubReqID()));
}
}
private SubscribeResp resp(int subId){
SubscribeResp subscribeResp=new SubscribeResp();
subscribeResp.setSubReqID(subId);
subscribeResp.setRespCode(0);
subscribeResp.setDesc("Netty learning is doing,please go on .");
return subscribeResp;
}
public void exceptionCaught(ChannelHandlerContext context,Throwable throwable){
throwable.printStackTrace();
context.close();
}
}
客户端启动类 SubscribeClient
public class SubscribeClient {
private void connect(String host, int port) {
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(
new ObjectDecoder(1024,
ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))
);
socketChannel.pipeline().addLast(new ObjectEncoder());
socketChannel.pipeline().addLast(new SubscribeClientHandler());
}
});
ChannelFuture future=bootstrap.connect(host,port).sync();
System.out.println("the netty client is started,and connected server");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//优雅退出
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new SubscribeClient().connect("127.0.00.1",8080);
}
}
客户端 业务处理类 SubscribeClientHandler
public class SubscribeClientHandler extends ChannelHandlerAdapter {
//tcp 链接成功后,就给服务端 循环发送10条信息
public void channelActive(ChannelHandlerContext context){
for (int i=0;i<10;i++){
context.write(subReq(i));
}
context.flush();
}
private SubscribeReq subReq(int i){
SubscribeReq subscribeReq=new SubscribeReq();
subscribeReq.setSubReqID(i);
subscribeReq.setAddress("深圳市宝安区西乡街道xxx");
subscribeReq.setPhoneNumber("174868xxxx");
subscribeReq.setProductName("Netty learning ");
subscribeReq.setUserName("echo");
return subscribeReq;
}
/**
* 由于对象解码器已经对 SubscribeResp 请求消息 进行了自动解码,
* 因此
* @param context
* @param obj
*/
public void channelRead(ChannelHandlerContext context,Object obj){
System.out.println("Receive server response : "+obj);
}
}
效果展示
服务端打印
客户端打印
总结
本章介绍了如何利用Netty 提供的ObjectEncoder编码器和ObjectDecoder解码器实现对普通POJO对象的序列化。通过上面的例子,我们学习了服务端和客户端的开发,并且模拟了TCP的粘包和拆包场景。还是很方便的实现了功能。利用Netty ,我们可以更多的关注自己的业务逻辑,而不用考虑这些粘包和拆包的底层问题