文章目录
- 概述
- ObjectEncoder
- ObjectDecoder
- Code
- 源码分析
- ObjectEncoder
- ObjectDecoder
- 小结
概述
Netty是一个高性能、异步的网络应用程序框架,它提供了对TCP、UDP和文件传输的支持。在Netty中,数据的发送和接收都是以字节流的形式进行的,因此需要将对象转换为字节流(编码)以及将字节流转换回对象(解码)。
ObjectEncoder
ObjectEncoder
是 Netty 中用于将对象编码为字节流的一种组件。在 Netty 的 pipeline 中,当你需要将某个对象发送到网络时,你可以使用 ObjectEncoder
来实现。它会将对象序列化为字节流,以便可以在网络中传输。
例如,当你使用 Netty 的 Bootstrap
类来配置你的客户端时,你可以为你的 channel pipeline 添加一个 ObjectEncoder
:
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
// 添加其他 handlers...
}
});
在这个例子中,ObjectEncoder
被添加到了 channel 的 pipeline 中,这样在数据传输过程中,发送的对象就会被自动编码为字节流。
ObjectDecoder
与 ObjectEncoder
相对应,ObjectDecoder
是用于将接收到的字节流解码为对象的组件。当你在 Netty 的 pipeline 中接收到字节流时,你可以使用 ObjectDecoder
来自动将字节流反序列化为对象。
继续上面的例子,如果你想在 pipeline 中添加 ObjectDecoder
,你可以这样做:
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder());
// 添加其他 handlers...
}
});
在这个例子中,ObjectDecoder
被添加到了 channel 的 pipeline 中,这样在数据接收过程中,接收到的字节流就会被自动解码为对象。
总的来说,ObjectEncoder
和 ObjectDecoder
是 Netty 中用于对象序列化和反序列化的工具,它们让开发者可以更方便地在网络中传输对象。
Code
这段代码是一个简单的Netty服务器启动类
package com.artisan.codec.objectencoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class NettyServer {
public static void main(String[] args) throws Exception {
// 创建事件循环组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 配置ServerBootstrap
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 初始化通道
ChannelPipeline pipeline = ch.pipeline();
// 添加ObjectDecoder
pipeline.addLast(new ObjectDecoder(10240, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
// 添加自定义的处理器
pipeline.addLast(new NettyServerHandler());
}
});
// 打印日志
System.out.println("netty server start。。");
// 绑定端口并启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(4567).sync();
// 等待服务器通道关闭
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅地关闭事件循环组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
在上述代码中,NettyServer
类通过ServerBootstrap
配置并启动了一个Netty服务器。服务器使用了两个事件循环组:一个用于处理连接(bossGroup
),另一个用于处理已连接的通道(workerGroup
)。
在initChannel
方法中,初始化了SocketChannel
的通道 pipeline,并添加了ObjectDecoder
和自定义的处理器NettyServerHandler
。ObjectDecoder
用于反序列化接收到的字节流为Java对象,NettyServerHandler
用于处理业务逻辑。
服务器启动后,会绑定到指定端口(本例中为4567),并等待服务器通道关闭。在关闭服务器之前,通过调用shutdownGracefully
方法优雅地关闭事件循环组。
请注意,此代码片段仅作为Netty服务器启动的示例,实际应用中需要根据具体业务需求调整NettyServerHandler
以实现相应的功能。
package com.artisan.codec.objectencoder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 当接收到客户端发送的消息时,执行该方法
System.out.println("从客户端读取到Object:" + ((ArtisanSimple) msg).toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 当发生异常时,执行该方法
cause.printStackTrace();
ctx.close();
}
}
package com.artisan.codec.objectencoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class NettyClient {
public static void main(String[] args) throws Exception {
// 创建事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建Bootstrap
Bootstrap bootstrap = new Bootstrap();
// 配置Bootstrap
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 初始化通道
ChannelPipeline pipeline = ch.pipeline();
// 添加ObjectEncoder
pipeline.addLast(new ObjectEncoder());
// 添加自定义的处理器
pipeline.addLast(new NettyClientHandler());
}
});
// 打印日志
System.out.println("netty client start。。");
// 连接到服务器
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 4567).sync();
// 等待客户端通道关闭
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅地关闭事件循环组
group.shutdownGracefully();
}
}
}
在上述代码中,NettyClient
类通过Bootstrap
配置并启动了一个Netty客户端。客户端使用了一个事件循环组(group
)来处理通道的连接和接收到的消息。
在initChannel
方法中,初始化了SocketChannel
的通道 pipeline,并添加了ObjectEncoder
和自定义的处理器NettyClientHandler
。ObjectEncoder
用于将Java对象序列化为字节流,NettyClientHandler
用于处理业务逻辑。
客户端启动后,会连接到指定IP地址(本例中为127.0.0.1)和端口(本例中为4567)的服务器,并等待客户端通道关闭。在关闭客户端之前,通过调用shutdownGracefully
方法优雅地关闭事件循环组。
请注意,此代码片段仅作为Netty客户端启动的示例,实际应用中需要根据具体业务需求调整NettyClientHandler
以实现相应的功能。
这段代码是一个自定义的Netty处理器,名为NettyClientHandler
。它继承自ChannelInboundHandlerAdapter
,用于处理客户端接收到的消息和通道激活事件。
package com.artisan.codec.objectencoder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 当接收到服务器发送的消息时,执行该方法
System.out.println("收到服务器消息:" + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 当通道激活时,执行该方法
System.out.println("NettyClientHandler发送数据");
// 测试对象编解码
ArtisanSimple artisanSimple = new ArtisanSimple(1, "xxxx");
ctx.writeAndFlush(artisanSimple);
}
}
在上述代码中,NettyClientHandler
类重写了channelRead
和channelActive
方法。
channelRead
方法用于处理客户端接收到的服务器消息。在这个例子中,它将打印出接收到的消息。在实际应用中,你可以根据业务需求修改此方法以处理不同的消息类型和逻辑。
channelActive
方法用于处理通道激活事件。在这个例子中,它将打印一条日志,并测试对象编解码功能。具体来说,它创建了一个ArtisanSimple
对象,并通过ctx.writeAndFlush()
方法将其发送到服务器。
在实际应用中,你可以根据需求修改此方法以实现不同的业务逻辑。
NettyClientHandler
处理器需要与ObjectEncoder
和ObjectDecoder
配合使用,以确保发送和接收到的字节流能够正确地反序列化为Java对象。在客户端启动类NettyClient
中,NettyClientHandler
已经添加到了通道的pipeline中,因此可以处理发送和接收到的消息。
package com.artisan.codec.objectencoder;
import java.io.Serializable;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class ArtisanSimple implements Serializable {
private int id;
private String name;
public ArtisanSimple() {
}
public ArtisanSimple(int id, String name) {
super();
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "ArtisanSimple{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
package com.artisan.codec.objectencoder;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class AddressSimple {
private String location;
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public AddressSimple() {
}
public AddressSimple(String location) {
this.location = location;
}
@Override
public String toString() {
return "AddressSimple{" +
"location='" + location + '\'' +
'}';
}
}
【测试】
源码分析
ObjectEncoder
这段代码定义了一个名为ObjectEncoder
的类,它属于Netty网络通信框架的一部分,用于将Java对象序列化为字节流。
下面是对代码的详细分析以及增加的中文注释:
package io.netty.handler.codec.serialization;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
* An encoder which serializes a Java object into a {@link ByteBuf}.
* <p>
* 请注意,此编码器产生的序列化形式与标准的{@link ObjectInputStream}不兼容。
* 请使用{@link ObjectDecoder}或{@link ObjectDecoderInputStream}以确保与该编码器的互操作性。
*/
@Sharable
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
// 定义一个占位符,用于标记ByteBuf中对象序列化数据长度的位置
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
// 覆写encode方法,实现序列化逻辑
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
int startIdx = out.writerIndex(); // 记录开始编码的位置
// 创建一个ByteBufOutputStream包装器,用于向ByteBuf中写入数据
ByteBufOutputStream bout = new ByteBufOutputStream(out);
ObjectOutputStream oout = null;
try {
bout.write(LENGTH_PLACEHOLDER); // 先写入长度占位符
// 创建一个紧凑型ObjectOutputStream,用于序列化对象
oout = new CompactObjectOutputStream(bout);
oout.writeObject(msg); // 将要序列化的对象写入流中
oout.flush(); // 刷新输出流,确保所有数据都被写出
} finally {
// 关闭ObjectOutputStream和ByteBufOutputStream
if (oout != null) {
oout.close();
} else {
bout.close();
}
}
int endIdx = out.writerIndex(); // 记录编码结束的位置
// 设置占位符的长度,即实际序列化数据长度
out.setInt(startIdx, endIdx - startIdx - 4);
}
}
在上述代码中,ObjectEncoder
类继承自MessageToByteEncoder
,这意味着它是一个用于将某种类型消息编码成字节流的编码器。encode
方法被重写以实现序列化过程。在这个方法中,首先通过ByteBufOutputStream
向ByteBuf
写入了一个长度占位符,然后通过CompactObjectOutputStream
将传入的Serializable
对象序列化成字节流,并写入到ByteBuf
中。最后,修改了长度占位符,将其设置为实际序列化数据的长度。
此代码片段使用@Sharable
注解标记,表明这个ChannelHandler
是可以共享给多个ChannelPipeline
的。
序列化完成后,通过ObjectOutputStream
的flush
方法刷新流,确保所有数据都被写出。最后,在finally
块中关闭输出流,确保资源被正确释放。
ObjectDecoder
这段代码定义了一个名为ObjectDecoder
的类,它也是Netty网络通信框架的一部分,用于将接收到的字节流反序列化为Java对象。
下面是对代码的详细分析以及增加的中文注释:
package io.netty.handler.codec.serialization;
// 引入Netty相关类库
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
// 引入Java序列化相关类库
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
/**
* A decoder which deserializes the received {@link ByteBuf}s into Java
* objects.
* <p>
* 请注意,此解码器期望的序列化形式与标准的{@link ObjectOutputStream}不兼容。
* 请使用{@link ObjectEncoder}或{@link ObjectEncoderOutputStream}以确保与该解码器的互操作性。
*/
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
// ClassResolver用于加载序列化对象的类
private final ClassResolver classResolver;
/**
* 创建一个新的解码器,其最大对象大小为1048576字节。
* 如果接收到的对象大小大于1048576字节,将抛出StreamCorruptedException异常。
*
* @param classResolver 用于此解码器的ClassResolver
*/
public ObjectDecoder(ClassResolver classResolver) {
this(1048576, classResolver);
}
/**
* 创建一个新的解码器,其最大对象大小为指定的值。
*
* @param maxObjectSize 序列化对象的最大字节长度。
* 如果接收到的对象的长度大于此值,将抛出StreamCorruptedException异常。
* @param classResolver 用于加载序列化对象类的ClassResolver
*/
public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
super(maxObjectSize, 0, 4, 0, 4);
this.classResolver = classResolver;
}
// 覆写decode方法,实现反序列化逻辑
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
// 创建一个紧凑型ObjectInputStream,用于反序列化对象
ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
try {
return ois.readObject(); // 读取并返回反序列化的对象
} finally {
ois.close(); // 关闭输入流
}
}
}
在上述代码中,ObjectDecoder
类继承自LengthFieldBasedFrameDecoder
,这意味着它是一个用于解码具有长度字段帧的解码器。decode
方法被重写以实现反序列化过程。在这个方法中,首先通过LengthFieldBasedFrameDecoder
的解码方法获取到包含序列化数据的ByteBuf
帧,然后通过CompactObjectInputStream
将字节流反序列化为Java对象。
此代码片段使用了一个ClassResolver
,它负责加载序列化对象的类,从而允许在反序列化过程中创建对象。反序列化完成后,通过ObjectInputStream
的close
方法关闭输入流,确保资源被正确释放。
小结
ObjectEncoder和ObjectDecoder是Netty框架中的两个重要组件,它们分别负责将Java对象编码为字节流以及将字节流解码为Java对象。
ObjectEncoder是一个ChannelOutboundHandler,它主要负责将Java对象转换为字节流。当发送一个对象时,ObjectEncoder会根据对象的类型将其序列化为字节流,以便在网络上进行传输。ObjectEncoder通常与ObjectDecoder配合使用,以确保编码和解码过程能够正确地进行。
ObjectDecoder是一个ChannelInboundHandler,它主要负责将接收到的字节流解码为Java对象。当接收到字节流时,ObjectDecoder会根据字节流的类型进行反序列化,将字节流转换回原始的Java对象。ObjectDecoder通常与ObjectEncoder配合使用,以确保编码和解码过程能够正确地进行。
在实际应用中,ObjectEncoder
和ObjectDecoder
需要根据业务需求进行定制,以便正确地处理各种不同类型的对象。通过使用这两个组件,Netty框架可以在发送和接收消息时自动进行对象的编码和解码,简化了网络编程的复杂度。