在sofa-jraft中,关于RPC的服务端是RpcServer
在RpcServer中的init方法中:
初始化了连接事件监听器,这个里面就是一个map,然后可以添加事件监听的处理器,初始化userProcessors, codec 是一个编码和解码器的工厂,可以获取编码器和解码器,然后他调用了父类的构造器super(port);
父类AbstractRemotingServer则是初始化了ip,port, options容器,包含BitSet的globalSwitch 全局开关,配置容器,configContainer,实现了接口RemotingServer : 这个接口主要是可以注册处理器
到此,已经完成了初始化,就是初始化一些map,开关,编码解码器,等,而AbstractRemotingServer继承了AbstractLifeCycle:我们来看下这个lifeCycle:
就是通过Atomic变量来控制启动和关闭
那么来看下start()方法,这个方法是RemotingServer的,实现在AbstractRemotingServer中,调用了startup方法(覆盖LifeCycle的方法),所以这两个方法其实是一样的,然后是doInit方法,这个是抽象方法,由RpcServer实现:
首先是根据是否管理Connection参数来初始化连接管理器,无论是否管理连接,都会初始化Eventhandler ,这里面实现了一些连接事件,大多数事件是没有自定义处理,都是打印日志
initRpcRemoting() 这个是初始化RpcRemoting也就是远程调用,这个可以后面说
这里就是初始化Netty的bootstrap,包括设置tcp参数,接收队列发送队列大小等,并设置高水位和低水位
这里是设置触发模式,水平触发还是边缘触发,如果是epoll,则设置为水平触发,否则设置为边缘触发
这里科普一下水平触发和边缘触发:
首先我们的网络模型select / poll, epoll 会有一个读取的调用,他们的方法都不一样,比如epoll的epoll_read ,如果是水平触发,那么如果是有一大串数据过来之后,读取一次没有读取完,那么就又会有一次读就绪事件,如果每次发送的数据量比较大,那么这种读事件就会有很多,会浪费很多的系统资源(文件描述符,事件触发也会有消耗)
如果是边缘触发,那么数据没读完,不会有事件过来,也就是调用epoll_read不会有返回,这样就需要自己在一次触发的时候吧缓冲空间的数据读完。
那么水平触发的场景可以是短报文场景,边缘触发可以是大报文场景
水平触发优、缺点及应用场景:
优点:当进行socket通信的时候,保证了数据的完整输出,进行IO操作的时候,如果还有数据,就会一直的通知你。
缺点:由于只要还有数据,内核就会不停的从内核空间转到用户空间,所有占用了大量内核资源,试想一下当有大量数据到来的时候,每次读取一个字节,这样就会不停的进行切换。内核资源的浪费严重。效率来讲也是很低的。
LT要避免写的死循环问题,写缓冲区满的概率很低,只要缓冲区没满(空闲)就会不断发信号,所以写完数据后一定要记得取消写事件。
边沿触发优、缺点及应用场景:
优点:每次内核只会通知一次,大大减少了内核资源的浪费,提高效率。
缺点:不能保证数据的完整。不能及时的取出所有的数据。
应用场景:处理大数据。使用non-block模式的socket。
ET要避免"short read"的问题,比如用户收到100个字节,触发了一次边沿警告,读了50个字节,剩下50个字节没读。但是没有警告。
接下来是设置时间处理器,ServerIdleHandler是当服务端发生了Idle事件的时候关闭链接,RpcHandler初始化的时候将userProcessor传入了参数
收到消息的以后会进行消息分发,我们后面单独看ProtocolCode及消息的分发处理,而在最后如果有新的连接,怎会加入到connectionmanager中
回到Abstract的startup,最后则是进行启动,而这个启动doStart()是在RpcServer中,绑定端口,启动用户处理器等
接下来先分析编码器和解码器:
编码器:
首先会从channel中获取协议版本,然后通过协议管理器来获取编码器,
两个版本的编码器,在字节流的首个字节都写入了PROTOCOL_CODE,以及content和header的长度
解码器:
解码器的流程是,先解析出协议的版本,将协议code写入channel的attribute,然后重置读指针,获取版本对应的解码器进行解码,而解码的流程也比较简单,逐个获取各个数据,根据长度将数据填入对象中
再一个就是用户处理器:
收到消息的事件处理器是RpcHandler,通过协议获取到RpcCommandHandler,在handle中,通过
来获取processor,在解码器中已经解析出了commandType,
而processorManager是RpcCommandHandler初始化的时候构造的,并注入了几个默认的processor
而在RPC_REQUESTprocessor中,
首先通过request的类型获取用户processor中的处理器来处理
而后就是增加一些选择器以及线程池选择,这里看下userprocessor到底有哪些
其实这里的userprocessor中有很多都是内部类,每一种消息类型都有对应的处理器,举个例子
选举投票有预选举,预选举跟选举的区别是不增长任期,并且设置preVote为true,避免网络发生隔离后重新加入集群,增加任期选举时因为任期无线增大的情况
这里封装的Request是RequestVoteRequest,rpcService.preVote 会在消息发送后执行done的Run方法
在构造RaftGroupService的时候,也可以是在通过RaftRpcServerFactory来创建RpcServer的时候:
会提前注入很多相关的处理器
而在回调的时候会调用processor的handleRequest方法,我们来看RequestVoteRequestProcessor:
handleRequest 在父类的父类RpcRequestProcessor中,调用子类的processRequest,接着再调用子类RequestVoteRequestProcessor的processRequest0方法,这里面则是具体的处理逻辑方法
我们的消息格式都是RpcRequestCommand,通过这个来进行编码和解码,而里面的requestClass,是在构造Command的时候注入的:
这是客户端发送消息时的最后转换处理,
解码后根据requestclass对应processor的interest
这样逻辑就能对应上了