Netty
1 BIO&NIO模型
1.1 BIO
在JDK1.4出来之前,我们建立网络连接的时候采用BIO模式,需要先在服务端启动一个ServerSocket,然后在客户端启动Socket来对服务端进行通信,默认情况下服务端需要对每个请求建立一堆线程等待请求,而客户端发送请求后,先咨询服务端是否有线程响应,如果没有则会一直等待或者遭到拒绝请求,如果有的话,客户端线程会等待请求结束后才继续执行。
1.1.1 BIO-demo
1.1.1.1 SocketClient
创建com.sjb.bio.BIOSocketServer
public class BIOSocketServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
System.out.println("等待连接...");
// 阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了...");
handler(clientSocket);
}
}
private static void handler(Socket clientSocket) {
byte[] bytes = new byte[1024];
System.out.println("准备read...");
// 接收客户端的数据,阻塞方法,没有数据可读时就阻塞
try {
int read = clientSocket.getInputStream().read(bytes);
System.out.println("read完毕...");
if (read != -1) {
System.out.println("接收到客户端的数据:" + new String(bytes, 0, read));
}
//clientSocket.getOutputStream().write("HelloClient".getBytes());
//clientSocket.getOutputStream().flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试一下,运行起来,用命令行
telnet localhost 9000
ctrl+】进入命令行模式
发送
send hello csdn-bblb
很基础的socket代码,没什么好说的。如果使用C语言完成网络编程的话这块应该非常熟悉。
但是我们这个代码有点问题,就是并发能力太弱了,自然而然的想到用线程来进行处理,创建线程或者线程池用来处理这个问题。
1.1.1.2 为SocketClient添加线程池
创建定长线程池
public class BIOSocketServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
//创建定长线程池
ExecutorService executorService = Executors.newFixedThreadPool(20);
while (true) {
System.out.println("等待连接...");
// 阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了...");
executorService.execute(new SocketProcessor(clientSocket));
}
}
}
executorService.execute(new SocketProcessor(clientSocket));
new一个SocketProcessor对象再调用execute执行线程逻辑
创建com.sjb.bio.SocketProcessor处理socket逻辑
public class SocketProcessor implements Runnable{
private Socket socket;
public SocketProcessor(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
processSocket(socket);
}
private void processSocket(Socket clientSocket) {
//处理socket
byte[] bytes = new byte[1024];
System.out.println("准备read...");
// 接收客户端的数据,阻塞方法,没有数据可读时就阻塞
try {
int read = clientSocket.getInputStream().read(bytes);
System.out.println("read完毕...");
if (read != -1) {
System.out.println("接收到客户端的数据:" + new String(bytes, 0, read));
}
//clientSocket.getOutputStream().write("HelloClient".getBytes());
//clientSocket.getOutputStream().flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试运行三个客户端
显示客户端端口不一样,说明我们的线程还是有用的。
但是当我们在处理业务逻辑时,会遇到一些类似的问题C10K或者C10M,意思是瞬间有10K或者10M的线程需要并发处理,这时候我们的无论我们选择怎样的线程池,都会面临创建线程内存爆掉的问题。这时候就引入我们的NIO了。
1.2 NIO
-
同步非阻塞的IO(non-blocking IO)
- 三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
- NIO面向缓冲区编程 ,增加了操作的灵活性。
- NIO读和写都是非阻塞的,当 读操作–>当前资源未就绪 或者 写操作–>当前未完全写入 时,当前线程都可以做其他处理。也就是说,多个请求发过来,不必分配相同线程数去处理请求(例如1000个请求,根据实际情况可以分配5-10个线程处理即可)。
-
NIO 和 BIO 对比
- BIO 以流的方式处理数据,而 NIO 以缓冲区的方式处理数据,缓冲区 I/O 的效率比流 I/O 高很多
- BIO 是阻塞的,NIO则是非阻塞的
- BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据 总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的 事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道。
1.2.1 NIO-demo
1.2.2.1 NIOServer
创建com.sjb.nio.NIOServer
public class NIOServer {
//保存客户端连接
private static List<SocketChannel> clientList = new ArrayList<>();
public static void main(String[] args) throws IOException {
//创建NIO ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9000));
//设置ServerSocketChannel为非阻塞
serverSocketChannel.configureBlocking(false);
System.out.println("服务启动成功");
while (true){
//非阻塞模式accept方法不会阻塞,否则会阻塞
//NIO的非阻塞就是通过设置为非阻塞模式,底层调用了Linux内核的accept函数
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null){ //有客户端连接
System.out.println("连接成功");
//设置SocketChannel为非阻塞
socketChannel.configureBlocking(false);
//保存客户端连接在List中
clientList.add(socketChannel);
}
//遍历连接进行数据读取
Iterator<SocketChannel> iterator= clientList.iterator();
while (iterator.hasNext()){
SocketChannel sc = iterator.next();
//读取数据
ByteBuffer buffer = ByteBuffer.allocate(128);
//设置为非阻塞模式read方法不会阻塞
int len = sc.read(buffer);
//如果有数据,把数据打印出来
if (len > 0){
System.out.println("接收到消息:" + new String(buffer.array()));
}else if (len == -1){ //如果客户端断开连接,把SocketChannel从List中去掉
iterator.remove();
System.out.println("客户端断开连接");
}
}
}
}
}
这里非阻塞就可以让一个进程处理很多客户端连接,但是也有缺点:
- 没有客户端连接的时候,在空转
- 有和多客户端连接的时候,每次都要遍历list去找到谁发的read。
我们需要改成事件驱动。
1.2.2.2 NIOSelectorServer
创建com.sjb.nio.NIOSelectorServer
public class NIOSelectorServer {
public static void main(String[] args) throws IOException {
//创建NIO ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9000));
//设置ServerSocketChannel为非阻塞
serverSocketChannel.configureBlocking(false);
//打开Selector处理Channel,即创建epoll
Selector selector = Selector.open();
//把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功");
while(true){
//阻塞等待需要处理的事件发生
selector.select();
//获取selector中注册的全部事件的 SelectionKey 实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//遍历SelectionKey对事件进行处理
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
//这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
}else if (key.isReadable()){ //如果是OP_READ事件,则进行读取和打印
SocketChannel client = (SocketChannel) key.channel();
client.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
int len = client.read(byteBuffer);
//如果有数据,把数据打印出来
if (len > 0){
System.out.println("接收到消息:" + new String(byteBuffer.array()));
}else if (len == -1){ //如果客户端断开连接,关闭Socket
System.out.println("客户端断开连接");
client.close();
}
}
//从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
}
2 Epoll非阻塞反应堆
2.1 Epoll
2.1.1 LT 水平触发
当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你
2.1.2 ET 边缘触发
当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你
2.1.3 socket文件描述符
关于socket中的阻塞与非阻塞,首先明白的是,阻塞与非阻塞是文件(文件描述符)的性质,而不是函数的性质
-
客户端:
- connfd:客户端创建socket时候得到的文件描述符。connect使用这个描述符主动发起链接。
-
服务端:
- listenfd:创建socket得到的文件描述符,同时bind和listen使用的也是这个文件描述符
- clientfd:调用accept得到的文件描述符,也就是用于通信的文件描述符
int lfd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(lfd, F_SETFL, O_NONBLOCK); //将socket设为非阻塞
2.2 Epoll非阻塞过程
简单来说就是 epoll的ET模式+非阻塞轮询+多态下针对不同的文件描述符的回调函数,反应堆不仅要监听读事件,也要监听写事件。
这里处理的事件是大小写转换。
2.2.1 服务器端
第一阶段:
- lfd=socket()创建lfd套接字
- bind()绑定地址
- listen()设置监听上限
- epoll_create()创建监听红黑树()
- epoll_ctl()把lfd加入红黑树
- while(1) 服务器端上线等待连接。
第二阶段:
- epoll_wait()服务器监听
- 有事件发生,epoll_wait()返回监听满足数组
- 一旦有事件发生则lfd一定在满足数组中,lfd进行accept()
- 用lfd进行accept()返回的连接套接字cfd放到红黑树中
- 执行读操作—>进行大小写转换操作—>把cfd节点的属性从EPOLLIN变为EPOLLOUT(可读变可写)
- 再把cfd重新挂上红黑树去监听写事件
- 等待epoll_wait()返回—>有返回说明能写—>执行写操作
- 把cfd从红黑树中拿下来—>再把cfd节点的属性从EPOLLOUT变为EPOLLIN
- 再把cfd重新挂上红黑树去监听读事件—>epoll_wait()服务器监听。
2.2.2 挂上红黑树的操作
void eventadd(int efd, int events, struct myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUT
if (ev->status == 0) { //已经在红黑树 g_efd 里
op = EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1
ev->status = 1;
}
if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改
printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
else
printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);
return ;
}
epoll_ctl
是用来控制 epoll 实例的函数,可以用于添加、修改或删除文件描述符和其对应的事件。
efd
:epoll 实例的文件描述符,通过epoll_create
或者epoll_create1
函数创建得到。op
:操作类型,可以是 EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DELEPOLL_CTL_ADD
:添加一个新的文件描述符到 epoll 实例中。EPOLL_CTL_MOD
:修改一个已经在 epoll 实例中的文件描述符的事件属性。EPOLL_CTL_DEL
:从 epoll 实例中删除一个文件描述符。
ev->fd
:要进行操作的文件描述符。&epv
:指向struct epoll_event
结构体的指针,用于设置文件描述符的事件属性。
2.3 select、poll、epoll的区别
3 响应式编程
为什么Redis是单线程却能支持上万的并发?是因为Redis也是多路复用的机制。
3.1 Netty
Netty 是一个 NIO 客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器。
“快速简便”并不意味着最终的应用程序将遭受可维护性或性能问题的困扰。Netty 经过精心设计,结合了许多协议(例如FTP,SMTP,HTTP 以及各种基于二进制和文本的旧式协议)的实施经验。结果,Netty 成功地找到了一种无需妥协即可轻松实现开发,性能,稳定性和灵活性的方法。
3.1.1 核心组件
Channel
Channel是 Java NIO 的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。
EventLoop 与 EventLoopGroup
EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。
EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。
Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。
一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。
ServerBootstrap 与 Bootstrap
Bootstarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。
-
Bootstrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。
-
ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。
ChannelHandler 与 ChannelPipeline
ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。
ChannelFuture
Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener() 方法为该异步操作添加监 NIO 网络编程框架 Netty 听器,为其注册回调:当结果出来后马上调用执行。
Netty 的异步编程模型都是建立在 Future 与回调概念之上
3.2 Netty源码
3.2.1 NettyServer
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建两个线程组bossGroup和workerGroup,含有的子线程NioEventLoop的个数默认为cpu核数的两倍
//bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup =new NioEventLoopGroup(1);
EventLoopGroup workerGroup =new NioEventLoopGroup(10);
try{
//创建服务器端的启动对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//使用链式编程来配置参数
serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
//使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
//多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
//创建通道初始化对象,设置初始化参数
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("...服务器 is ready...");
//绑定一个端口并且同步,生成一个ChannelFuture对象,通过isDone()等方法可以判断异步事件的执行情况
//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
System.out.println("...服务器 is starting...");
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
3.2.2 NettyServerHandler
ch.pipeline().addLast(new NettyServerHandler());
通过这个来添加处理方法进行回调
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接通道建立成功...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
ByteBuf buf = (ByteBuf) msg;
System.out.println("Client Message ====== " + buf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}