最近我在研究Netty,之前只是经常听说,并没有实际做过研究,为什么突然要好好研究一下它,主要是因为前段时间,我在看RocketMQ底层原理的时候发现它的底层的网络通信都是基于Netty,然后网上一查,果然,大家太多的耳熟能详的工具组件,都是基于Netty做的开发。大家看看:
可以看到太多的分布式或者微服务组件都是基于它,因为分布式/微服务的根基在于网络编程,而Netty就是一款非常成熟的网络编程工具。所以它在网络编程学习中不可避免的学习目标。
接下来我会用几篇文章为大家分享一下我最近的学习成果,给大家做了很好的总结,希望能够给大家带来帮助。
1. 概述
1.1 概念
Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
Netty框架是基于Java原生NIO技术的进一步封装,对Java-NIO技术做了进一步增强,充分结合了Reactor线程模型,将Netty变为了一个基于异步事件驱动的网络框架。
Netty至今共发布了五个大版本,目前最常用的并非是最新的5.x系列,而是4.x系列的版本,原因是Netty本身就是基于Java-NIO封装的,而JDK本身又很稳定,再加上5.x版本并未有太大的性能差异,因此4.x系列才是主流。
1.2 应用场景
-
互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的有:阿里的Dubbo,Rocketmq底层也是用的Netty作为基础通信组件。
-
游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
-
大数据领域:经典的Hadoop 的高性能通信,默认采用 Netty 进行跨界点通信,它的Netty Service 基于Netty 框架二次封装实现。
1.3 入门实操
说千遍还不如让我们一起来动手感受一遍,现在就直接先实操一番快速入门。
1 引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
2 服务端
然后先创建NettyServer服务端,代码如下:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 创建两个EventLoopGroup,boss:处理连接事件,worker处理I/O事件
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
// 创建一个ServerBootstrap服务端(同之前的ServerSocket类似)
ServerBootstrap server = new ServerBootstrap();
try {
// 将前面创建的两个EventLoopGroup绑定在server上
server.group(boss,worker)
// 指定服务端的通道为Nio类型
.channel(NioServerSocketChannel.class)
// 为到来的客户端Socket添加处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
// 这个只会执行一次(主要是用于添加更多的处理器)
@Override
protected void initChannel(NioSocketChannel ch) {
// 添加一个字符解码处理器:对客户端的数据解码
ch.pipeline().addLast(
new StringDecoder(CharsetUtil.UTF_8));
// 添加一个入站处理器,对收到的数据进行处理
ch.pipeline().addLast(
new SimpleChannelInboundHandler<String>() {
// 读取事件的回调方法
@Override
protected void channelRead0(ChannelHandlerContext
ctx, String msg) {
System.out.println("收到客户端信息:" + msg);
}
});
}
});
// 为当前服务端绑定IP与端口地址(sync是同步阻塞至连接成功为止)
ChannelFuture cf = server.bind("127.0.0.1",8888).sync();
// 关闭服务端的方法(之后不会在这里关闭)
cf.channel().closeFuture().sync();
}finally {
// 优雅停止之前创建的两个Group
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
3 客户端
再构建一个NettyClient客户端,代码如下:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class NettyClient {
public static void main(String[] args) {
// 由于无需处理连接事件,所以只需要创建一个EventLoopGroup
EventLoopGroup worker = new NioEventLoopGroup();
// 创建一个客户端(同之前的Socket、SocketChannel)
Bootstrap client = new Bootstrap();
try {
client.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc)
throws Exception {
// 添加一个编码处理器,对数据编码为UTF-8格式
sc.pipeline().addLast(new
StringEncoder(CharsetUtil.UTF_8));
}
});
// 与指定的地址建立连接
ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
// 建立连接成功后,向服务端发送数据
System.out.println("正在向服务端发送信息......");
cf.channel().writeAndFlush("我是技术闲聊DD!");
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
4 启动运行
我们先启动服务端,再启动客户端,就会看到如下:
上面的案例,就是利用了Netty实现了简单的对端通信,实现的功能很简单。
5 代码流程
接下来我为大家解释一下Netty里面的一些核心概念。
EventLoopGroup
:负责管理Channel
的事件处理任务。管理多个EventLoop
的组件。它为网络应用程序提供了更好的并发支持和处理请求的能力ServerBootstrap/Bootstrap
:意思是引导,一个 Netty 应用通常由一个Bootstrap
开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中Bootstrap
类是客户端程序的启动引导类,ServerBootstrap
是服务端启动引导类。childHandler
:可以理解成过滤器,在我们以前学习的Servlet编程中,新请求到来都会经过一个个的过滤器,而这个处理器也类似于之前的过滤器,新连接到来时,也会经过添加好的一系列处理器。
然后我为大家把上面案例的完整流程分析一下:
- 先创建两个
EventLoopGroup
事件组,然后创建一个ServerBootstrap
服务端。 - 将创建的两个事件组
boss
、worker
绑定在服务端上,并指定服务端通道为NIO类型
。 - 在server上添加处理器,对新到来的Socket连接进行处理,在这里主要分为两类:
ChannelInitializer
:连接到来时执行,主要是用于添加更多的处理器(只触发一次)
。addLast()
:通过该方式添加的处理器不会立马执行,而是根据处理器类型择机执行
。
- 为创建好的服务端绑定IP及端口号,调用
sync()
意思是阻塞至绑定成功为止。 - 再创建一个
EventLoopGroup
事件组,并创建一个Bootstrap客户端。 - 将事件组绑定在客户端上,由于无需处理连接事件,所以只需要一个事件组。
- 指定
Channel
通道类型为NIO
、添加处理器…(同服务端类似) - 与前面服务端绑定的地址建立连接,由于默认是异步的,也要调用
sync()
阻塞。 - 建立连接后,客户端将数据写入到通道准备发送,首先会先经过添加好的编码处理器,将数据的格式设为UTF-8。
- 服务器收到数据后,会先经过解码处理器,然后再去到入站处理,执行对应的
Read()
方法逻辑。 - 客户端完成数据发送后,先关闭通道,再优雅关闭创建好的事件组。
- 同理,服务端工作完成后,先关闭通道再停止事件组。
大家可以先看完我上面写的流程,然后再回到案例代码上去看,就会感觉到比较清晰。
注意:Netty大部分操作都是异步的,比如地址绑定、客户端连接。就类似于调用connect()方法与服务端建立连接时,主线程自己并不会去执行这个动作,而是会把这个工作交给事件组中的线程去完成,所以此刻如果主线程直接去向通道中写入数据,有几率会出现报错,在实际生产环境中,可能由于网络延迟导致连接建立的时间有些长,此时通道并未建立成功,因此尝试发送数据时就会有问题。
2. 核心组件 - 启动器与事件组
2.1 启动器 ServerBootstrap和Bootstrap
首先我们先看一下ServerBootstrap及Bootstrap的类继承结构图:
我们可以看一个表格,就会对其有所了解。
2.2 事件组EventLoopGroup
和EventLoop
EventLoopGroup
是一组 EventLoop
的集合,可以理解为 EventLoop
的管理器。其作用是负责创建和管理一组 EventLoop
对象,其中每个 EventLoop
对象都对应一个线程,用于处理网络 I/O 事件(例如读写数据、连接建立、连接关闭等)。在 Netty 中,通常会使用两种不同的 EventLoopGroup
:BossEventLoopGroup
和 WorkerEventLoopGroup
。其中 BossEventLoopGroup
负责处理连接建立事件,而 WorkerEventLoopGroup
负责处理连接已经建立后的数据读写事件。这样可以将连接建立处理与数据读写处理分开,提高并发性能。
EventLoop 则是一个重要的核心组件,用于处理网络 I/O 事件。它负责执行一系列的 I/O 操作,例如读取数据、写入数据、注册感兴趣的事件以及调度用户自定义的任务等。每个 EventLoop 都有一个独立的任务队列和计时器队列,并且运行在一个独立的线程上。同时,每个 EventLoop 还会被绑定到一个 NIO Selector 对象上,用于轮询网络 I/O 事件,处理事件后再将相应的任务加入到任务队列中。
注意:既然EventLoop/EventLoopGroup
继承自JDK原生的定时线程池,那也就代表着,它拥有JDK线程池中所有提供的方法,同时也应该会支持执行异步任务、定时任务的功能。
public static void main(String[] args) {
EventLoopGroup threadPool = new NioEventLoopGroup();
// 递交Runnable类型的普通异步任务
threadPool.execute(()->{
System.out.println("execute()方法提交的任务....");
});
// 递交Callable类型的有返回异步任务
threadPool.submit(() -> {
System.out.println("submit()方法提交的任务....");
return "我是执行结果噢!";
});
// 递交Callable类型的延时调度任务
threadPool.schedule(()->{
System.out.println("schedule()方法提交的任务,三秒后执行....");
return "调度执行后我会返回噢!";
},3, TimeUnit.SECONDS);
// 递交Runnable类型的延迟间隔调度任务
threadPool.scheduleAtFixedRate(()->{
System.out.println("scheduleAtFixedRate()方法提交的任务....");
},3,1,TimeUnit.SECONDS);
}
执行结果如下:
立即执行:
execute()方法提交的任务....
submit()方法提交的任务....
延时三秒后执行:
schedule()方法提交的任务....
scheduleAtFixedRate()方法提交的任务....
之后没间隔一秒执行:
scheduleAtFixedRate()方法提交的任务....
scheduleAtFixedRate()方法提交的任务....
上述我们创建了一个EventLoopGroup
事件循环组,然后通过之前JDK线程池提供的一系列的提交任务的方法,向其递交了几个异步任务,然后运行该程序,显然,EventLoopGroup
确实可以当做JDK原生线程池来使用。
还有几个方法:
EventLoop.inEventLoop(Thread)
:判断一个线程是否属于当前EventLoop。EventLoop.parent()
:判断当前EventLoop属于哪一个事件循环组。EventLoopGroup.next()
:获取当前事件组中的下一个EventLoop(线程)。
大家还记得在前面的示例代码中,我们定义了两个组,为什么吗?主要是定义两个组的好处在于:可以让Group中的每个EventLoop
分工更加明确,不同的Group分别处理不同类型的事件,各司其职。
为服务端绑定了两个事件循环组,也就代表着会根据ServerSocketChannel
上触发的不同事件,将对应的工作分发到这两个Group中处理,其中boss主要负责客户端的连接事件,而worker大多数情况下负责处理客户端的IO读写事件。
过程是:当客户端的SocketChannel连接到来时,首先会将这个注册事件的工作交给boss处理,boss会调用worker.register()
方法,将这条客户端连接注册到worker工作组中的一个EventLoop上。前面提到过,EventLoop
内部会维护一个Selector选择器,因此实际上也就是将客户端通道注册到其内部中的选择器上。
注意:将一个Socket连接注册到一个EventLoop上之后,这个客户端连接则会和这个EventLoop绑定,以后这条通道上发生的所有事件,都会交由这个EventLoop处理。
大家有没有思考一个问题,就是由于EventLoopGroup
本质上可以理解成一个线程池,其中存在的线程资源自然是有限的,那如果过来的客户端连接大于线程数量怎么办呢?答案是因为Netty本身是基于Java-NIO封装的,而NIO底层又是基于多路复用模型实现的,天生就能实现一条线程管理多个连接的功能,所以就算连接数大于线程数,也完全可以Hold住。
总结:简单来说可以EventLoop理解成有一条线程专门维护的Selector选择器,而EventLoopGroup
则可以理解成一个有序的定时调度线程池,负责管理所有的EventLoop
。
下一篇会为大家介绍Netty的通道处理器以及缓冲区,希望大家多多关注。