Netty
传统的IO模型的web容器,比如老版本的Tomcat,为了增加系统的吞吐量,需要不断增加系统核心线程数量,或者通过水平扩展服务器数量,来增加系统处理请求的能力。有了NIO之后,一个线程即可处理多个连接事件,其中基于多路复用模型的Netty框架,不仅降低了使用NIO的复杂度,
优点
Netty是一款以java NIO为基础,基于事件驱动模型支持异步、高并发的网络应用框架。
-
API使用简单,开发门槛低,简化了NIO开发网络程序的复杂度
-
功能强大,预置多种编解码功能,支持多种主流协议,比如Http、WebSocket。
-
定制能力强,可以通过ChannelHandler对通信框架灵活扩展。
-
性能高,支持异步非阻塞通信模型
-
成熟稳定,社区活跃,已经修复了Java NIO所有的Bug。
-
经历了大规模商业应用的考验,质量有保证。
Reactor模型
Reactor模型也叫做反应器设计模式,是一种为处理服务请求并发提交到一个或者多个服务处理器的事件设计模式。
而Netty是基于Reactor模型设计的一套高性能网络应用框架,下面来看下常见的Reactor模型有哪些:
一、单Reactor单线程
1)可以实现通过一个阻塞对象监听多个链接请求
2)Reactor对象通过select监听客户端请求事件,通过dispatch进行分发
3)如果是建立链接请求,则由Acceptor通过accept处理链接请求,然后创建一个Handler对象处理完成链接后的各种事件
4)如果不是链接请求,则由Reactor分发调用链接对应的Handler来处理
5)Handler会完成Read->业务处理->send的完整业务流程
二、单Reactor多线程
1)Reactor对象通过select监听客户端请求事件,收到事件后,通过dispatch分发
2)如果是建立链接请求,则由Acceptor通过accept处理链接请求,然后创建一个Handler对象处理完成链接后的各种事件
3)如果不是链接请求,则由Reactor分发调用链接对应的Handler来处理
4)Handler只负责事件响应不做具体业务处理
5)通过read读取数据后,分发到worker线程池处理,处理完成后返回给Handler,Handler收到后,通过send将结果返回给client
三、主从Reactor多线程
1)Reactor主线程MainReactor对象通过select监听链接事件,通过Acceptor处理
2)当Acceptor处理链接事件后,MainReactor将链接分配给SubReactor
3)SubReactor将链接加入到队列进行监听,并创建Handler进行事件处理
4)当有新事件发生时,SubReactor就会调用对应的Handler处理
5)Handler通过read读取数据,分发到worker线程池处理,处理完成后返回给Handler,Handler收到后,通过send将结果返回给client
6)Reactor主线程可以对应多个Reactor子线程
三种模式用生活案例来理解:
1)单Reactor单线程,前台接待员和服务员是同一个人,全程为顾客服务
2)单Reactor多线程,1个前台接待员,多个服务员,接待员只负责接待
3)主从Reactor多线程,多个前台接待员,多个服务员
Reactor模型具有如下优点:
1)响应快,不必为单个同步事件所阻塞,虽然Reactor本身依然是同步的
2)可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
3)扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU资源
4)复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性
核心组件
Bootstrap
一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件。
Handler
为了支持各种协议和处理数据的方式,便诞生了Handler组件。Handler主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelInboundHandler
一个最常用的Handler。这个Handler的作用就是处理接收到数据时的事件,也就是说,我们的业务逻辑一般就是写在这个Handler里面的,ChannelInboundHandler就是用来处理我们的核心业务逻辑。
ChannelInitializer
当一个链接建立时,我们需要知道怎么来接收或者发送数据,当然,我们有各种各样的Handler实现来处理它,那么ChannelInitializer便是用来配置这些Handler,它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline。
ChannelPipeline
一个Netty应用基于ChannelPipeline机制,这种机制需要依赖于EventLoop和EventLoopGroup,因为它们三个都和事件或者事件处理相关。
EventLoops
目的是为Channel处理IO操作,一个EventLoop可以为多个Channel服务。EventLoopGroup由多个EventLoop组成。
Channel
代表了一个Socket链接,或者其它和IO操作相关的组件,它和EventLoop一起用来参与IO处理。
Future
在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,但是我们可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发。
示例
下面通过一个简单的示例,了解怎么基于Netty如何开发一个简单的网络通信程序。和之前一样,包含服务端与客户端:
Server:
@Slf4j
public class Server {
private EventLoopGroup boosGroup;
private EventLoopGroup workGroup;
public Server(int port){
try {
init(port);
log.info("----- 服务启动成功 -----");
} catch (InterruptedException e) {
log.error("启动服务出错:{}", e.getCause());
}
}
private void init(int port) throws InterruptedException {
// 处理连接
this.boosGroup = new NioEventLoopGroup();
// 处理业务
this.workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// 绑定
bootstrap.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.class) //配置服务端
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 1024)
.childOption(ChannelOption.SO_SNDBUF, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}
public void close(){
this.boosGroup.shutdownGracefully();
this.workGroup.shutdownGracefully();
}
}
@Slf4j
class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>> server active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1. 读取客户端的数据(缓存中去取并打印到控制台)
ByteBuf buf = (ByteBuf) msg;
byte[] request = new byte[buf.readableBytes()];
buf.readBytes(request);
String requestBody = new String(request, "utf-8");
log.info(">>>>>>>>> receive message: {}", requestBody);
//2. 返回响应数据
ctx.writeAndFlush(Unpooled.copiedBuffer((requestBody+" too").getBytes()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
Client:
@Slf4j
public class Client {
private EventLoopGroup workGroup;
private ChannelFuture channelFuture;
public Client(int port){
init(port);
}
private void init(int port){
this.workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_RCVBUF, 1024)
.option(ChannelOption.SO_SNDBUF, 1024)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
});
this.channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
}
/**
*
* @param message
*/
public void send(String message){
this.channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
}
/**
*
*/
public void close(){
try {
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
workGroup.shutdownGracefully();
}
}
@Slf4j
class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>> client active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
log.info(">>>>>>>>> receive message: {}", body);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
测试:
public class StarterTests {
static int port = 9011;
@Test
public void startServer(){
Server server = new Server(9011);
}
@Test
public void startClient(){
Client client = new Client(port);
client.send("Hello Netty!");
while (true){}
}
}
主要实现步骤:
1、创建两个NIO线程组,一个专门用来网络事件处理(接受客户端连接),另一个则进行网络通讯读写
2、创建一个ServerBootstrap对象,配置Netty的一系列参数,例如接收传入数据的缓存大小等。
3、创建一个实际处理数据的类ChannelInitializer,进行初始化的准备工作,比如设置传入数据的字符集格式,实现具体处理数据的接口。
4、绑定端口,执行同步阻塞方法等待服务器启动即可
类似技术
Mina、Netty、Grizzly
1、Mina
Mina(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 Mina 版本2.04支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序,Mina 所支持的功能也在进一步的扩展中。
目前,正在使用Mina的应用包括:Apache Directory Project、AsyncWeb、AMQP(Advanced Message Queuing Protocol)、RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、 Openfire等等。
2、Netty
Netty是一款异步的事件驱动的网络应用框架和工具,用于快速开发可维护的高性能、高扩展性协议服务器和客户端。也就是说,Netty是一个NIO客户端/服务器框架,支持快速、简单地开发网络应用,如协议服务器和客户端。它极大简化了网络编程,如TCP和UDP套接字服务器。
3、Grizzly
Grizzly是一种应用程序框架,专门解决编写成千上万用户访问服务器时候产生的各种问题。使用JAVA NIO作为基础,并隐藏其编程的复杂性。容易使用的高性能的API。带来非阻塞socketd到协议处理层。利用高性能的缓冲和缓冲管理使用高性能的线程池。
从设计的理念上来看,Mina的设计理念是最为优雅的。当然,由于Netty的主导作者与Mina的主导作者是同一人,出自同一人之手的Netty在设计理念上与Mina基本上是一致的。而Grizzly在设计理念上就较差了点,几乎是JavaNIO的简单封装。
结束语
前面我们说到了各种IO模型,基于Java NIO开发的网络应用程序,能够更好的基于服务器操作系统底层支持,更好的利用服务器资源提供高性能、可靠易扩展的应用。而Netty在Java NIO的基础上,为开发这提供便捷提升开发的效率与稳定性,在很多主流的框架中都有被使用,比如Dubbo、Spring Webflux等