68 Netty
参考资料
- 【硬核】肝了一月的Netty知识点
概念
Netty 是一个高性能、异步事件驱动的网络应用框架,简化了 Java 网络编程,适用于构建高效、可扩展的网络服务器和客户端。
Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用,Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程,但是你仍然可以使用底层的 API。
Netty是一个非阻塞的IO客户端服务器框架
阻塞(Block)与非阻塞(Non-Block)
-
阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候。
-
阻塞:往往需要等待缓冲区中的数据准备好过后才处理其他的事情,否则一直等待在那里。
-
非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。
- Feign 是一个用于调用 RESTful API 的同步框架,调用方会被阻塞,直到接收到响应。在消息队列的场景中,生产者通常需要非阻塞地发送消息,以提高性能和吞吐量。
Netty 核心组件
Channel
Channel是 Java NIO 的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。
EventLoop 与 EventLoopGroup
EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。
EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。
Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。
一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。
ServerBootstrap 与 Bootstrap
Bootstarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。
Bootstrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。
ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。
ChannelHandler 与 ChannelPipeline
ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。
ChannelFuture
Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener() 方法为该异步操作添加监 NIO 网络编程框架 Netty 听器,为其注册回调:当结果出来后马上调用执行。
Netty 的异步编程模型都是建立在 Future 与回调概念之上的。
心跳
因为 Netty 建立的是长连接,也就是说只要不在 Client 的代码中手动 channel.close();
那该连接就会一直保持着,直到客户端或者服务器一方关闭。
也不是说长连接它就不好,但大家想想,每一个客户端都一直占着一个连接,即使它后面已经用不到服务器了,而服务器能承受的连接数是有限的,后面再来了真正有需求的用户,它也进不来了,而且长时间的高并发也可能导致服务器宕机。
所以,有没有一种办法,如果我一段时间用不到服务器,就把这个连接给关掉?答:心跳机制。所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包(比如消息内容是某种要求格式、内容),通知对方自己还在线,以确保 TCP 连接的有效性。
在 Netty 中,实现心跳机制的关键是 IdleStateHandler(空闲状态处理器),它的作用跟名字一样,是用来监测连接的空闲情况。然后我们就可以根据心跳情况,来实现具体的处理逻辑,比如说断开连接、重新连接等等。
总结
Netty 是一个高性能的异步事件驱动的网络应用框架,广泛用于开发网络应用程序,如协议服务器和客户端。简单来说这个框架是基于异步事件驱动的网络应用框架,这里最重要的一个点是事件驱动。
何为事件驱动呢?事件驱动就是指它的服务逻辑都是针对发生的事件来触发的,一旦某个事件发生了,就会触发该时间的处理逻辑。比如建立连接、数据读取、数据写入等等情况。
该应用框架总体来说就是为了两个服务直接进行通信,只不过这个是异步通信,而不是像之前那样的同步通信,它是非阻塞的,具有很高的性能。那么连接两个节点之间进行通信的称为通道(Channel),通过这个通道来实现双方之间的通信。这些都是应用层面的,对于传输层来说,具体使用的是TCP协议还是UDP协议都做了适配。
ChannelHandler 是处理特定事件和数据的组件。你可以定义自己的 ChannelHandler,重写方法来处理数据的编解码、连接的生命周期事件、异常处理等。这个可以看成通道上的管理人员,我们可以在通道的两头编写的自定义的入站和出站逻辑。
在网络通信中,数据通常需要进行编码和解码。在 Netty 中,编解码的过程通过 ChannelHandler
的实现来完成。Netty 提供了 MessageToByteEncoder
和 ByteToMessageDecoder
等抽象类,简化了这一过程。
ChannelPipeline 是一个关键概念,它允许开发者将多个 ChannelHandler 组合在一起,形成一个处理链。当数据在 Channel 中流动时,它会依次经过链中的每个处理器。这种设计使得数据的处理过程可以灵活配置,支持不同的协议和业务逻辑。
例如:
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//空闲检测
.addLast(new ServerIdleStateHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new NettyServerHandler());
}
}
这里就是一个ChannelPipeline。
Reactor模型是Netty采用的线程模型,下面是其一个图:
Netty 的线程模型基于 Reactor 模式,这种模式特别适合于处理大量的并发连接。Reactor 模式的基本思路是将 I/O 操作与业务逻辑处理分离,将 I/O 事件的处理和业务逻辑处理分配给不同的线程。
事件循环: 每个 EventLoop 会不断地循环处理事件,包括连接事件、读事件和写事件。EventLoop 负责从操作系统获取可读或可写的 Channel,并将这些事件分发给对应的 ChannelHandler 进行处理。
Netty的处理架构如下:
Netty主要在Reactor的主从模型上进行了一些改进后得到了目前Netty使用的Reactor模型。
Netty工作原理
参考资料:【Netty系列】Netty高性能架构设计以及光速入门
一图见真知
Netty 抽象出了两组线程池BossGroup和WorkerGroup,其中BossGroup专门负责接受客户端的连接,WorkerGroup专门负责网络的读写。BossGroup和WorkerGroup的类型都是NioEventLoopGroup。NioEventLoopGroup相当于一个事件循环组,这个组种含有多个事件循环,每一个事件循环是一个NioEventLoop。NioEeventLoop表示一个不断循环执行处理任务的线程,可以有多个线程,也就是说可以含有多个NioEventLoop,每个NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯。
EventLoop 和 EventLoopGroup:EventLoop
是 Netty 的事件循环,它处理 Channel 上的所有事件,包括 I/O 事件、任务调度等。每个 EventLoop
负责处理一个或多个 Channel
,确保同一个 Channel
的所有 I/O 操作都在同一个线程中执行,避免线程安全问题。EventLoopGroup
是 EventLoop
的集合,通常分为两类:
- BossGroup:负责处理新的连接请求。
- WorkerGroup:负责处理已经建立的连接的 I/O 操作。
Bootstrap/ServerBootstrap:用于启动 Netty 应用程序的辅助类。Bootstrap
用于客户端,ServerBootstrap
用于服务器。它们简化了配置和初始化过程。
SpringBoot集成Netty
参考资料
- Springboot 2.0 +protobuf + Netty 实战(附源码)
- Spring Boot与Netty的完美结合:打造高性能网络通信
protobuf协议格式
在整合使用 Netty 的过程中,我们使用 Google 的protobuf定义消息格式,下面来简单介绍下 protobuf。
了解一个新的事物,我们从这几个方面快速的了解:
- 它是什么?Protobuf是一个数据格式,专业术语为:一种结构化数据存储格式;
- 它有什么用?通常用于结构化数据的序列化和反序列化;
- 它相比较之前存在的有什么优点?解决了什么问题?它相对于JSON来说,性能更好,支持跨语言(因为它最后是二进制格式的数据编码)
通过上面三个问题就可以对Protobuf有一个初步清晰的了解。
引入Netty依赖
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.112.Final</version>
</dependency>
Netty服务端
@Component
@Slf4j
public class NettyServer {
/**
* boss 线程组用于处理连接工作
*/
private final EventLoopGroup boss = new NioEventLoopGroup();
/**在
* work 线程组用于数据处理
*/
private final EventLoopGroup work = new NioEventLoopGroup();
// @Value("${netty.port}")
private static final Integer port = 54021;
// @PostConstruct 是一个用于 Java EE 和 Spring 框架中的注解,标记在一个方法上,表示这个方法将在依赖注入完成后被自动调用。
// 它通常用于进行初始化操作,例如设置默认值、执行启动时的逻辑、或者进行资源的准备。
/***
* @Description 启动Netty Server
* @return {@link }
* @Author yaoHui
* @Date 2024/10/10
*/
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss,work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
// 方法设置了 Socket 选项 SO_BACKLOG,它指定了在连接队列中可以排队的最大连接数。当服务器正在处理现有连接时,
// 新的连接请求会被放入队列中,直到当前连接处理完成。1024 是队列的长度,这个值可以根据需要进行调整。
.option(ChannelOption.SO_BACKLOG,1024)
// 设置 SO_KEEPALIVE 选项为 true,这意味着在 TCP 连接上启用 TCP 保活机制。
// 如果连接闲置时间过长,系统会发送保活探测包,以保持连接的活跃状态。
.childOption(ChannelOption.SO_KEEPALIVE,true)
// 设置 TCP_NODELAY 选项为 true,启用 Nagle 算法。
// 这会在发送小数据包时禁用延迟,从而减少数据包的发送延迟,提高实时性。
.childOption(ChannelOption.TCP_NODELAY,true)
.childHandler(new NettyServerHandlerInitializer());
ChannelFuture future = serverBootstrap.bind().sync();
if (future.isSuccess()){
log.info("Netty Server Running");
}
}
@PreDestroy
public void destroy() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
log.info("Netty Server Stop");
}
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof MessageBase.Message) {
// 将接收到的消息转换为 Protobuf 消息
MessageBase.Message message = (MessageBase.Message) msg;
// 处理消息(例如,打印消息内容)
log.info("Received message:");
log.info(message.toString());
// 可选:根据接收到的消息发送响应
MessageBase.Message response = MessageBase.Message.newBuilder()
.setRequestId(message.getRequestId())
.setCmd(MessageBase.Message.CommandType.ACK)
.build();
ctx.writeAndFlush(response); // 发送响应
} else {
// 如果接收到的消息不是预期的类型,可以选择忽略或者抛出异常
System.err.println("Received an unknown message type: " + msg.getClass().getName());
ctx.fireChannelRead(msg); // 继续向下传递
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 处理异常
cause.printStackTrace();
ctx.close(); // 关闭连接
}
}
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//空闲检测
.addLast(new ServerIdleStateHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new NettyServerHandler());
}
}
@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
Integer times = 0;
/**
* 设置空闲检测时间为 30s
*/
private static final int READER_IDLE_TIME = 60;
public ServerIdleStateHandler() {
super(READER_IDLE_TIME, READER_IDLE_TIME, READER_IDLE_TIME, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("{} 秒内没有读取到数据,关闭连接", READER_IDLE_TIME);
System.out.println(ctx.channel().remoteAddress() + "发生超时事件--" + evt);
ctx.channel().close();
}
}
Netty客户端
@Component
@Slf4j
public class NettyClient {
private static final EventLoopGroup group = new NioEventLoopGroup();
private static final Integer port = 54021;
private static final String host = "localhost";
private static SocketChannel socketChannel;
/***
* @Description Netty客户端启动函数 调用Start可以启动对Netty服务端的连接
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
@PostConstruct
private void start(){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 空闲检测
.addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲,30秒读空闲
.addLast(new HeartbeatHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new NettyClientHandler())
; // 自定义处理器
}
});
ChannelFuture future = bootstrap.connect();
//客户端断线重连逻辑
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("连接Netty服务端成功");
} else {
log.info("连接失败,进行断线重连");
future1.channel().eventLoop().schedule(this::start, 10, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
sendMessageThread.setSocketChannel(socketChannel);
}
}
proto文件
//protobuf语法有 proto2和proto3两种,这里指定 proto3
syntax = "proto3";
// 文件选项
option java_package = "com.fang.screw.client.protocol";
option java_outer_classname = "MessageBase";
// 消息模型定义
message Message {
string requestId = 1;
CommandType cmd = 2;
string content = 3;
int32 retryCount = 4;
string urlPath = 5;
enum CommandType {
NORMAL = 0; //常规业务消息
HEARTBEAT_REQUEST = 1; //客户端心跳消息
HEARTBEAT_RESPONSE = 2; //服务端心跳消息
ACK = 3;
}
}
需要把Protobuf文件进行编译,具体如何操作请查阅相关资料。