Netty系列整体栏目
内容 | 链接地址 |
---|---|
【一】深入理解网络通信基本原理和tcp/ip协议 | https://zhenghuisheng.blog.csdn.net/article/details/136359640 |
【二】深入理解Socket本质和BIO | https://zhenghuisheng.blog.csdn.net/article/details/136549478 |
【三】深入理解NIO的基本原理和底层实现 | https://zhenghuisheng.blog.csdn.net/article/details/138451491 |
【四】深入理解反应堆模式的种类和具体实现 | https://zhenghuisheng.blog.csdn.net/article/details/140113199 |
反应堆模式的种类和具体实现
- 一,反应堆模式的种类和底层原理
- 1,单线程反应堆模式
- 2,单线程-work工作者线程池模式
- 3,多线程主从模式
- 4,redis中的reactor反应堆模式
一,反应堆模式的种类和底层原理
前面文章中讲解了什么是socket,socket的本质就是操作系统为我们开发人员提供的一些列api,内部封装了从客户端a 从传输层,网络层,数据链路层,物理层再到 客户端b 中的物理层,数据链路层,网络层再到传输层之间的内部协议,如下图,让开发人员秩序更加关注应用层之间的开发,不需要关注底层的具体实现
socket内部会在网络通信中,去实现 tcp的三次握手,处理丢包后的网络重传流量控制等
上一篇中讲解了Reactor反应堆模式的核心以及组成部分,接下来详细的讲解一下反应堆模式的种类以及底层的实现原理
1,单线程反应堆模式
首先第一种就是单线程的反应堆模式,就是说不管是客户端的网络连接,还是读取网络上的数据,或者说具体的相关的业务处理,都是通过一个线程里面负责和处理的
如下面这段代码,首先创建一个 ServerHandle 的线程任务类,并且在构造方法中设置对应的 selector 选择器和一个处理请求的 ServerSocketChannel 的通道,由于将该类作为服务端,并且为了设置是单线程的模式,因此将这个selector和socket设置为唯一
public class ServerHandle implements Runnable{
private Selector selector;
private ServerSocketChannel serverChannel;
/**
* 构造方法
* @param port 指定要监听的端口号
*/
public ServerHandle(int port) {
try{
//创建选择器
selector = Selector.open();
serverChannel = ServerSocketChannel.open();打开监听通道
serverChannel.configureBlocking(false);//开启非阻塞模式
//绑定端口 backlog设为1024
serverChannel.socket().bind(new InetSocketAddress(port),1024);
//监听客户端连接请求
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
started = true
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
}
随后再这个类里面重写这个 run 方法,都是遍历这个selector 轮询器,获取里面的读写或者监听事件,将即将处理的事件从轮询器中删除,如果抛出异常则取消这个key的事件处理,事件全部处理完成则将轮询器close关闭
@Override
public void run() {
//循环遍历selector
while(started){
try{
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Throwable t){
t.printStackTrace();
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null){
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
真正处理时间的方法在这个 handleInput 方法中,里面会去判断这个key是属于哪一个事件的,比如是读事件,还是写事件,还是监听事件,都会进行相应的处理。
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//处理新接入的请求消息
if(key.isAcceptable()){
//获得关心当前事件的channel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//通过ServerSocketChannel的accept创建SocketChannel实例
//完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
SocketChannel sc = ssc.accept();
System.out.println("======socket channel 建立连接=======");
//设置为非阻塞的
sc.configureBlocking(false);
//连接已经完成了,可以开始关心读事件了
sc.register(selector, SelectionKey.OP_READ);
}
//读消息
if(key.isReadable()){
System.out.println("======socket channel 数据准备完成," +
"可以去读==读取=======");
SocketChannel sc = (SocketChannel) key.channel();
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position,position=0,
// 用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String message = new String(bytes,"UTF-8");
System.out.println("服务器收到消息:" + message);
//处理数据
String result = response(message) ;
//发送应答消息
doWrite(sc,result);
}
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
if(key.isWritable()){
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer)key.attachment();
if(buffer.hasRemaining()){
int count = sc.write(buffer);
System.out.println("write :"+count
+"byte, remaining:"+buffer.hasRemaining());
}else{
/*取消对写的注册*/
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
在写事件完成之后,buffer缓冲区会有新的空间,因此可以将读到的数据写入到写缓冲区中,由于tcp全双工的特性,因此可以实现服务端边读边写的功能。
//发送应答消息
private void doWrite(SocketChannel channel,String response)
throws IOException {
//将消息编码为字节数组
byte[] bytes = response.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
channel.register(selector,SelectionKey.OP_WRITE|SelectionKey.OP_READ,
writeBuffer);
}
通过以上案例,可以完整的说明白单线程是完全可以实现一个bio的,通过Reactor单线程反应堆的模式去完成所有的请求和连接。
虽然单线程可以完整的实现整个BIO的流程,但是单线程也有单线程的弊端,如由于业务响应也是通过单线程去处理,如果涉及到某个业务需要花费太长的时间,那么整个系统就会处于一个阻塞的状态,只有等这个任务结束之后,才能继续进行下一步的任务。因此单线程的反应堆模式也是有却缺陷的
2,单线程-work工作者线程池模式
在纯粹的单线程模式中,可能会因为某一个具体的业务导致整个系统处于一个阻塞的瘫痪状态,都是因为全部任务都共用一个线程,那么这就好办了,就是只有监听事件,读事件和写事件共用同一个单线程,如果是涉及到需要处理业务的任务,那么就将这部分丢到线程池中,通过线程池中的线程去处理,这样就不会影响主线程的执行,并且通过这种异步的方式,从而增快主线程处理任务的效率,提升整个系统的响应
这部分代码不做详细解释,就是再创建一个 ServerHandleWorker 的Task任务类,并且实现Callable接口,然后再这个类中重写的run方法去做对应的业务即可
3,多线程主从模式
在2中已经对1进行了很大的优化,但是在reacotr模式的线程中,还是需要处理read读事件和write写事件,因此为了让这个reactor单线程更快,那么又可以将一个线程拆分成两个线程,让主reactor只需要负责处理接收事件,让从reactor异步的去处理读事件和写事件,然后处理其他业务的事件存放在线程池中,从而完全的提升整个系统的吞吐量和效率
4,redis中的reactor反应堆模式
在redis的6.0之前,redis内部就是使用一个标准的单线程 reactor 反应堆模式,通过一个线程去执行连接事件,读事件,写事件和其他的一些业务事件等等
在redis6.0开始,redis内部多线程的主从模式基本一致,通过mainReactor主线程处理接收事件和读事件,但是由子线程去执行读事件和写事件,同时业务线程还是通过mainReactor主线程去执行,从而保证减少一些并发冲突