写在前面
netty当前是网络io框架的事实标准,基于nio实现,框架的作者是韩国一位姓李的朋友,开始我们这位行李的韩国朋友开发一个io框架mina,但后来其离职,mina也就和其没有关系了,所以后来其改进了mina的不足和各种问题,重新开发了一个全新的框架,也就是现在的netty。本文就一起看下其简单的使用和底层原理。
1:实现一个http server
能够在浏览器访问并给出响应的的server我们就可以叫做http server,即只要简单的支持http协议即可,netty对此提供了支持。
首先定义启动类:
/**
* 通过netty实现一个http 的server
*/
public class NettyHttpServer {
public static void main(String[] args) throws InterruptedException {
int port = 8808;
// boss 线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
// 工作线程组
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
try {
// 入口启动类
ServerBootstrap b = new ServerBootstrap();
// 设置各种配置
b.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
.childOption(EpollChannelOption.SO_REUSEPORT, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpInitializer());
Channel ch = b.bind(port).sync().channel();
System.out.println("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
这里需要一个初始化类来绑定ChannelHandler到channel对应的pipieline上,如下:
public class HttpInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
//p.addLast(new HttpServerExpectContinueHandler());
p.addLast(new HttpObjectAggregator(1024 * 1024));
p.addLast(new HttpHandler());
}
}
这里增加了HttpServerCodec编解码器,来支持http协议,并定义http响应的业务处理类HttpHandler,如下:
public class HttpHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
//logger.info("channelRead流量接口请求开始,时间为{}", startTime);
FullHttpRequest fullRequest = (FullHttpRequest) msg;
String uri = fullRequest.uri();
//logger.info("接收到的请求url为{}", uri);
if (uri.contains("/test")) {
handlerTest(fullRequest, ctx, "hello,kimmking");
} else {
handlerTest(fullRequest, ctx, "hello,others");
}
} catch(Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}
private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx, String body) {
FullHttpResponse response = null;
try {
String value = body; // 对接上次作业的httpclient或者okhttp请求另一个url的响应数据
// httpGet ... http://localhost:8801
// 返回的响应,"hello,nio";
// value = reponse....
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
response.headers().set("Content-Type", "application/json");
response.headers().setInt("Content-Length", response.content().readableBytes());
} catch (Exception e) {
System.out.println("处理出错:"+e.getMessage());
response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
} finally {
if (fullRequest != null) {
if (!HttpUtil.isKeepAlive(fullRequest)) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, KEEP_ALIVE);
ctx.write(response);
}
ctx.flush();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
接着我们启动主类来进行测试,先curl:
C:\WINDOWS\system32>curl http://localhost:8808/
hello,others
然后sb压测:
C:\WINDOWS\system32>sb -u http://localhost:8808/ -c 40 -N 10
Starting at 2023/6/1 星期四 13:36:01
[Press C to stop the test]
96867 (RPS: 7115.8)
---------------Finished!----------------
Finished at 2023/6/1 星期四 13:36:15 (took 00:00:13.7040309)
Status 200: 96880
RPS: 8778.8 (requests/second)
Max: 220ms
Min: 0ms
Avg: 0.2ms
50% below 0ms
60% below 0ms
70% below 0ms
80% below 0ms
90% below 0ms
95% below 0ms
98% below 1ms
99% below 3ms
99.9% below 16ms
性能还不错。
2:netty的模型
最简单的阻塞io模型,每个线程都要通过while(true)
的方式来获取数据以及处理数据,其中的获取数据过程,即IO操作是阻塞的,这就会导致线程占用CPU资源没有真正的执行业务,导致CPU的使用率降低,如下图:
如果是我们能够将IO的操作抽取出来,就能解决IO阻塞业务线程导致性能降低的问题了,这个问题我们可以使用NIO来解决,通过其selector机制来查看所有的socket的数据状态,这样就能解决业务线程因为IO阻塞而导致的CPU利用率降低问题,如下图:
在继续往下分析之前,我们先看下事件处理机制,事件处理机制,根据不同的事件来生成不同的任务,并将任务放在任务队列中,然后任务分发器按照任务不同的类型放到不同的任务处理器的任务队列中,如下图:
结合事件处理机制以及NIO机制就产生了reactor模式,doug lea针对reactor模式提出了3三种reactor线程模型,分别来看下。
2.1:单线程reactor线程模型
单线程的reactor线程模型,只有一个reactor线程,负责网络连接状态的维护,数据的读取,以及有数据时业务代码的执行,执行结果的发送,如下图:
此时只有一个线程,所以阻塞的io操作和业务执行还是耦合在一起的,效果同阻塞io模型,所以性能比较低,此时可以将其进化为reactor多线程模型。
2.2:reactor多线程模型
reactor多线程模型,不同于reactor单线程模型,会有一个专门的线程池从已经准备就绪的channle中(注意是已经是有数据可读取了,所以此处io不会阻塞,而只是从channle中read数据)
读取数据并执行业务代码,这样就解耦了阻塞式的io操作和业务操作,并且业务操作是以线程池的方式来执行的,所以效率会比较高,如下图:
红线以上区域就是reactor单线程模型的部分,红线下方就是扩展出来的线程池。也可参考下图:
这种方式还有一个性能瓶颈点就是,负责网络连接状态维护的reactor线程依然是单线程的,这个reactor线程要负责连接状态的维护,数据可读状态的检测,通道数据可读事件的分发(即实现分发器)
,负责处理的工作还是比较多的,特别是数据可读事件的分发,当并发量比较高时,可能成为性能瓶颈,因此我们可以将这些功能进行拆分,进化为reactor主从模型。
2.3:reactor主从模型
reactor的主从模型在reactor多线程模型的基础上,对rector线程的功能进行了拆分,其中网络连接状态的维护依然由mainReactor线程负责,但是channel数据的可读状态的检测,通道可读事件的分发,由一个单独的线程池subReactor负责,负责业务执行的线程池和reactor多线程模型相同,如下图:
2.4:netty对3中reactor线程模型的支持
netty使用了reactor线程模型,并支持了其具体的3种线程模型,编程方式类似,如下图:
当使用reactor的主从模式时netty启动过程如下:
这里有一个很重要的对象,NIOEventLoop是一个thread+selector的集合,负责不断的处理任务并从channel中读取数据执行业务代码。接着看下netty的整体信息:
信息如下:
1:很多的client连接到boss group,即mainReactor
2:workergourp,即subReactor,负责channel数据可读状态的检测,并分发数据处理任务到EventLoop的任务队列中
3:如果EventLoop判断当前任务对应的channel是自己负责的,则从channle中读取数据,并调用与该channel绑定的pipeline,并执行其中N多的ChannelHanlder(包含了我们的业务执行代码)。
netty的关键对象如下:
1:启动
服务端ServerBootstrap,客户端Bootstrap
2:事件处理
EventLoopGroup,封装一组EventLoop
EventLoop,负责一组channel的数据的处理
3:通道
SocketChannel,绑定在EventLoop上,代表一个网络连接
4:业务处理
ChannelPipeline,绑定在SocketChannel上,负责管理一组业务处理的ChannelHandler
ChannelHandler,多个,绑定在ChannelPipeline,负责具体的业务处理
5:绑定ChannelPipeline到SocketChannel
ChannelInitailizer,完成绑定工作
如下图:
另外就是ChannelPipeline还分为是读入数据还是写出数据,分为InboundChannelHandler和OutBoundChannleHandler,在编程时需要注意:
写在后面
参考文章列表
Reactor之多工作线程与主从模式 。