1 编码和解码
编写网络应用程序时,因为数据在网络传输的都是二进制字节码数据,在发送数据时进行编码,在接受数据时进行解码
codec(编码器)的组成部分有2个:decoder(解码器)和encoder(编码器)。encoder负责将数据转换为字节码数据,而decoder负责将字节码数据转为业务数据
2 Netty的编码和解码机制问题
Netty提供的编码器:
StringEncoder:对字符串数据进行编码
ObjectEncoder:对java对象进行编码
Netty提供的解码器:
StringDecoder:对字符串数据进行编码
ObjectDecoder:对java对象进行编码
Netty本身自带的ObjectDecoder和ObjectEncoder可以实现用来对JOPO对象或各种业务对象的编码和解码。底层使用的依然是Java序列化技术,而java序列化技术本身就存在问题
无法跨语言
序列化的体积太大,是二进制编码的5倍多
序列化的性能太低
所以就出现了Google的Protobuf。
3 ProtoBuf
3.1基本介绍:
1) ProtoBuf是Google发布的开源项目,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化,它很适合做数据存储或RPC[远程过程调用]数据交换格式,目前很多公司正在由http+json->tcp+protobuf
Protobuf是以message的方式来管理数据的
支持跨平台,跨语言,即客户端和服务端可以是不同语言编写的
高性能,可读性高
使用protobuf编译器能够自动生成代码,ProtoBuf是将类的定义使用.protobuf文件进行描述(注意:在idea中编写.proto文件时,会自动提示是否下载.protot编写插件,可以让语法高亮)
然后通过.protoc.exe编译器根据.protp自动生成java文件
3.2 ProtoBuf生成类(单种类型)
实例要求:客户端发送一个Student PoJo对象到服务器(通过ProtoBuf编码),服务端接收Student PoJo对象,并显示信息(通过ProtoBuf解码)
步骤一:在maven中引入ProtoBuf坐标,下载相关jar包
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.7.1</version>
</dependency>
步骤二:github下载相关protoc文件(注意:下载文件需和pom文件中版本对应,否则生成java文件时会报错)
下载链接:https://github.com/protocolbuffers/protobuf/tags
然后进行相关idea配置:https://blog.csdn.net/Xin_101/article/details/123332526?
步骤三:编写代码
服务端:
public class NettyServer {
public static void main(String[] args) throws Exception {
//1.创建2个线程组bossGroup和workerGroup
//2 bossGroup只是处理连接请求,workerGroup真正的和客户端进行业务处理
//3 两个都是无限循环
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class) //使用nioSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向pipeline加入ProtobufDecoder,指定对哪种对象进行解码
ch.pipeline().addLast("decoder",new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
ch.pipeline().addLast(new NettyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("....服务器 is ready...");
//绑定一个端口并且同步,生成了一个ChannelFuture对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关联通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端处理器:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据的事件(这里读取客户端发送的消息)
/**
*
* @param ctx ChannelHandlerContext ctx:上下文对象,含有管道pipeline,通道channel,地址
* @param msg 就是客户端发送的消息
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//读取从客户端发送的StudentPojo,Student
StudentPOJO.Student student = (StudentPOJO.Student)msg;
System.out.println("客户端发送的数据 id="+student.getId() + " 名字="+student.getName());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(LocalDateTime.now()+" :hello,客户端~",CharsetUtil.UTF_8));
}
//处理异常,一般需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端:
public class NettyClient {
public static void main(String[] args) throws Exception{
//客户端需要一个循环组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端的启动对象
//注意客户端使用的是Bootstrap不是ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) //设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//在pipeline中加入ProtobufEncoder
ch.pipeline().addLast("encoder",new ProtobufEncoder());
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok ...");
//启动客户端连接服务器 ChannelFuture涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
客户端处理器:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送一个Student对象到服务器
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("王五").build();
ctx.writeAndFlush(student);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
如果不想在channelRead()方法中进行强转,可以将处理器实现SimpleChannelInboundHandler类型,从而实现channelRead0()方法
proto文件:
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO"; //生成的外部类类名,同时也是文件名
//protobuf使用message管理数据
message Student{ //会在StudentPOJO 外部类生成一个内部类 Student
//他是真正发送的POJO对象
int32 id = 1; //Student类中有一个属性id 类型为int32(protobuf类型) 注意: 1表示属性序号 不是值
string name = 2;
}
将proto文件转为java文件结果如下:
先启动客户端,再次启动服务端:
客户端控制台:
服务端控制台:
发现,已经用protobuf实现了数据的传输。
3.3 ProtoBuf生成类(多种类型)
实例要求:客户端发送一个Student PoJo \Worker PoJo对象到服务器(通过ProtoBuf编码),服务端接收Student PoJo\Worker PoJo对象,并显示信息(通过ProtoBuf解码)
服务端代码;
package com.liubujun.codec2;
import com.liubujun.codec.StudentPOJO;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
/**
* @Author: liubujun
* @Date: 2023/2/15 10:24
*/
public class NettyServer {
public static void main(String[] args) throws Exception {
//1.创建2个线程组bossGroup和workerGroup
//2 bossGroup只是处理连接请求,workerGroup真正的和客户端进行业务处理
//3 两个都是无限循环
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class) //使用nioSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向pipeline加入ProtobufDecoder,指定对哪种对象进行解码
ch.pipeline().addLast("decoder",new ProtobufDecoder(
MyDataInfo.MyMessage.getDefaultInstance()));
ch.pipeline().addLast(new NettyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("....服务器 is ready...");
//绑定一个端口并且同步,生成了一个ChannelFuture对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关联通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端处理器:
package com.liubujun.codec2;
import com.liubujun.codec.StudentPOJO;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import java.time.LocalDateTime;
/**
* @Author: liubujun
* @Date: 2023/2/15 10:25
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage msg) throws Exception {
//根据dataType 来显示不同的信息
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if (dataType == MyDataInfo.MyMessage.DataType.StudentType){
MyDataInfo.Student student = msg.getStudent();
System.out.println("学生id="+student.getId() +" 学生名字="+student.getName());
}else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType){
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人年纪="+worker.getAge() +" 工人姓名="+worker.getName());
}else {
System.out.println("传输的类型不正确");
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(LocalDateTime.now() + " :hello,客户端~", CharsetUtil.UTF_8));
}
//处理异常,一般需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端:
package com.liubujun.codec2;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
/**
* @Author: liubujun
* @Date: 2023/2/15 10:27
*/
public class NettyClient {
public static void main(String[] args) throws Exception{
//客户端需要一个循环组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端的启动对象
//注意客户端使用的是Bootstrap不是ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) //设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//在pipeline中加入ProtobufEncoder
ch.pipeline().addLast("encoder",new ProtobufEncoder());
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok ...");
//启动客户端连接服务器 ChannelFuture涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
客户端处理器:
package com.liubujun.codec2;
import com.liubujun.codec.StudentPOJO;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Random;
/**
* @Author: liubujun
* @Date: 2023/2/15 10:28
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机的发送Student 或者 Worker对象
int random = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if (0 == random){ //发送Student对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
.setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("小飞棍来喽").build()).build();
}else { //发送一个Worker对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
.setWorker(MyDataInfo.Worker.newBuilder().setAge(5).setName("来了 老李").build()).build();
}
ctx.writeAndFlush(myMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
proto文件:
syntax = "proto3";
option optimize_for = SPEED; //加快解析
option java_package = "com.liubujun.codec2";//指定生成到哪个包下
option java_outer_classname = "MyDataInfo"; //外部类名称
//proto可以使用message管理其它的message
message MyMessage {
//定义一个枚举类型
enum DataType {
StudentType = 0; //在proto3中要求enum的编号从0开始
WorkerType = 1;
}
//用data_type来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中一个,节省空间
oneof dataBody{
Student student = 2;
Worker worker = 3;
}
}
message Student{
int32 id = 1; //Studnet类的属性
string name = 2;
}
message Worker{
string name = 1;
int32 age = 2;
}
先启动服务端,再启动多个客户端:发现数据正常传输