目录
引入MessagePack依赖
实体类
服务端代码
客户端代码
执行结果
引入MessagePack依赖
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
实体类
@Message//MessagePack提供的注解,表明这是一个需要序列化的实体类
public class User {
private String id;
private String userName;
private int age;
private UserContact userContact;
public User(String userName, int age, String id) {
this.userName = userName;
this.age = age;
this.id = id;
}
public User() {
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public UserContact getUserContact() {
return userContact;
}
public void setUserContact(UserContact userContact) {
this.userContact = userContact;
}
@Override
public String toString() {
return "User{" +
"userName='" + userName + '\'' +
", age=" + age +
", id='" + id + '\'' +
", userContact=" + userContact +
'}';
}
}
@Message//MessagePack提供的注解,表明这是一个需要序列化的实体类
public class UserContact {
private String mail;
private String phone;
public UserContact() {
}
public UserContact(String mail, String phone) {
this.mail = mail;
this.phone = phone;
}
public String getMail() {
return mail;
}
public void setMail(String mail) {
this.mail = mail;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
@Override
public String toString() {
return "UserContact{" +
"mail='" + mail + '\'' +
", phone='" + phone + '\'' +
'}';
}
}
服务端代码
public class ServerMsgPackEcho {
public static final int PORT = 9995;
public static void main(String[] args) throws InterruptedException {
ServerMsgPackEcho serverMsgPackEcho = new ServerMsgPackEcho();
System.out.println("服务器即将启动");
serverMsgPackEcho.start();
}
public void start() throws InterruptedException {
final MsgPackServerHandler serverHandler = new MsgPackServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
0,2,0,
2));
ch.pipeline().addLast(new MsgPackDecoder());
ch.pipeline().addLast(new MsgPackServerHandler());
}
}
}
/*基于MessagePack的解码器,反序列化*/
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
throws Exception {
final int length = msg.readableBytes();
final byte[] array = new byte[length];
msg.getBytes(msg.readerIndex(),array,0,length);
MessagePack messagePack = new MessagePack();
out.add(messagePack.read(array,User.class));
}
}
@ChannelHandler.Sharable
public class MsgPackServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将上一个handler生成的数据强制转型
User user = (User)msg;
System.out.println("Server Accept["+user
+"] and the counter is:"+counter.incrementAndGet());
//服务器的应答
String resp = "I process user :"+user.getUserName()
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
ctx.fireChannelRead(user);
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端代码
public class ClientMsgPackEcho {
private final String host;
public ClientMsgPackEcho(String host) {
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
final Bootstrap b = new Bootstrap();;/*客户端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
/*配置要连接服务器的ip地址和端口*/
.remoteAddress(
new InetSocketAddress(host, ServerMsgPackEcho.PORT))
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
System.out.println("已连接到服务器.....");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
/*告诉netty,计算一下报文的长度,然后作为报文头加在前面*/
ch.pipeline().addLast(new LengthFieldPrepender(2));
/*对服务器的应答也要解码,解决粘包半包*/
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
/*对我们要发送的数据做编码-序列化*/
ch.pipeline().addLast(new MsgPackEncode());
ch.pipeline().addLast(new MsgPackClientHandler(5));
}
}
public static void main(String[] args) throws InterruptedException {
new ClientMsgPackEcho("127.0.0.1").start();
}
}
/*基于MessagePack的编码器,序列化*/
public class MsgPackEncode extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out)
throws Exception {
MessagePack messagePack = new MessagePack();
byte[] raw = messagePack.write(msg);
out.writeBytes(raw);
}
}
public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final int sendNumber;
public MsgPackClientHandler(int sendNumber) {
this.sendNumber = sendNumber;
}
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的处理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
+"] and the counter is:"+counter.incrementAndGet());
}
/*** 客户端被通知channel活跃后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User[] users = makeUsers();
//发送数据
for(User user:users){
System.out.println("Send user:"+user);
ctx.write(user);
}
ctx.flush();
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/*生成用户实体类的数组,以供发送*/
private User[] makeUsers(){
User[] users=new User[sendNumber];
User user =null;
for(int i=0;i<sendNumber;i++){
user=new User();
user.setAge(i);
String userName = "ABC--->"+i;
user.setUserName(userName);
user.setId("No:"+(sendNumber-i));
user.setUserContact(
new UserContact(userName+"@9527.com","133333333"));
users[i]=user;
}
return users;
}
}
核心就是通过MessagePack提供的处理器来处理数据。