前言
什么是RPC
RPC(Remote Procedure Call)远程过程调用,简言之就是像调用本地方法一样调用远程服务。目前外界使用较多的有gRPC、Dubbo、Spring Cloud等。相信大家对RPC的概念都已经很熟悉了,这里不做过多介绍。
为啥要自己写
为什么要自己写一个RPC框架,dubbo难道满足不了你?我觉得从个人成长上说,如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现、负载均衡、序列化协议、RPC通信协议、Socket通信、异步调用、熔断降级等技术,可以全方位的提升基本素质。虽然也有相关源码,但是只看源码容易眼高手低,动手写一个才是自己真正掌握这门技术的最优路径。(换个角度想想dubbo真的适合你们项目吗?有多少人觉得dubbo只是比feign性能好,更主流,大公司都用等等理由才用的?)
RPC框架要素
一款分布式RPC框架离不开三个基本要素:
- 服务提供方 Serivce Provider
- 服务消费方 Servce Consumer
- 注册中心 Registery
围绕上面三个基本要素可以进一步扩展服务路由、负载均衡、服务熔断降级、序列化协议、通信协议等等。
-
注册中心
主要是用来完成服务注册和发现的工作。虽然服务调用是服务消费方直接发向服务提供方的,但是现在服务都是集群部署,服务的提供者数量也是动态变化的,所以服务的地址也就无法预先确定。因此如何发现这些服务就需要一个统一注册中心来承载。
-
服务提供方(RPC服务端)
其需要对外提供服务接口,它需要在应用启动时连接注册中心,将服务名及其服务元数据发往注册中心。同时需要提供服务服务下线的机制。需要维护服务名和真正服务地址映射。服务端还需要启动Socket服务监听客户端请求。
-
服务消费方(RPC客户端)
客户端需要有从注册中心获取服务的基本能力,它需要在应用启动时,扫描依赖的RPC服务,并为其生成代理调用对象,同时从注册中心拉取服务元数据存入本地缓存,然后发起监听各服务的变动做到及时更新缓存。在发起服务调用时,通过代理调用对象,从本地缓存中获取服务地址列表,然后选择一种负载均衡策略筛选出一个目标地址发起调用。调用时会对请求数据进行序列化,并采用一种约定的通信协议进行socket通信。
技术选型
注册中心
目前成熟的注册中心有Zookeeper,Nacos,Consul,Eureka,它们的主要比较如下:
我们这里采用nacos
IO通信框架
本实现采用Netty作为底层通信框架,Netty是一个高性能事件驱动型的非阻塞的IO(NIO)框架。
通信协议
TCP通信过程中会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。所以需要对发送的数据包封装到一种通信协议里。
业界的主流协议的解决方案可以归纳如下:
- 消息定长,例如每个报文的大小为固定长度100字节,如果不够用空格补足。
- 在包尾特殊结束符进行分割。
- 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段。
很明显1,2都有些局限性,本实现采用方案3,自定义协议
序列化协议
常见的协议有JavaSerializer、json、Protobuf及Hessian。建议选用Protobuf,其序列化后码流小性能高,非常适合RPC调用,Google自家的gRPC也是用其作为通信协议。但是我们这里采用Hessian2序列化(懒,其他后续慢慢实现)
整体架构
下面就来看看实现吧
通信相关
通信协议
- 第一个是魔法数,比如我定义为0x01F1。
- 第二个代表时间戳,以便对时间进行校验
- 第三个是消息类型,如0代表请求1代表响应。
- 第四个是加密序列号,采用随机的方式对消息体加密和解密
- 第五个表示消息长度,即此后面的内容是消息content。
对应实体类如下:
编码器
也就是需要按顺序写入消息的字节到缓冲器,如下:
public class EasyRpcEncoder extends MessageToByteEncoder<RpcRemoteMsg> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, RpcRemoteMsg rpcRemoteMsg, ByteBuf out) throws Exception {
// 写入开头的标志
out.writeShort(rpcRemoteMsg.getStartSign());
// 写入秒时间戳
out.writeInt(rpcRemoteMsg.getTimeStamp());
// 写消息类型
out.writeShort(rpcRemoteMsg.getMsgType());
// 写入加密序列号
out.writeShort(rpcRemoteMsg.getEncryptSequence());
// 写入消息长度
out.writeInt(rpcRemoteMsg.getContentLength());
// 写入消息主体
out.writeBytes(rpcRemoteMsg.getContent());
}
}
解码器
这里省事,直接继承LengthFieldBasedFrameDecoder实现自己逻辑,如下:
public class EasyRpcDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(EasyRpcDecoder.class);
// 开始标记
private final short HEAD_START = (short) 0x01F1;
public EasyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
public EasyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
public EasyRpcDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
public EasyRpcDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(byteOrder, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 经过父解码器的处理 我们就不需要在考虑沾包和半包了
// 当然,想要自己处理沾包和半包问题也不是不可以
ByteBuf decode = (ByteBuf) super.decode(ctx, in);
if (decode == null) {
return null;
}
// 开始标志校验 开始标志不匹配直接 过滤此条消息
short startIndex = decode.readShort();
if (startIndex != HEAD_START) {
return null;
}
// 时间戳
int timeStamp = decode.readInt();
// 请求还是响应 0:请求 1:响应
short msgType = decode.readShort();
// 加密序列号
int encryptSequence = decode.readShort();
// 消息体长度
int contentLength = decode.readInt();
// 读取消息
byte[] msgByte = new byte[contentLength];
decode.readBytes(msgByte);
// 将消息转成实体类 传递给下面的数据处理器
return msgType == 0 ? EncryptUtil.remoteMsgToRequest(encryptSequence,msgByte) : EncryptUtil.remoteMsgToResponse(encryptSequence,msgByte);
}
}
nacos相关
nacos其实很简单一共四个方法:
- registerInstance :注册
- deregisterInstance : 下线
- getAllInstances : 获取所有实例
- subscribe :订阅服务
我们nacos全局就一个使用类:
public class NacosEasyRpcCenter implements EasyRpcCenter {
private final static Logger log = LoggerFactory.getLogger(NacosEasyRpcCenter.class);
private final static String DEFAULT_NAMESPACE = "public";
private final static String DEFAULT_META_PARAM = "easyRpcMeta";
private NacosNamingService nacosNamingService;
private EasyRpcApplicationConfig applicationConfig;
private EasyRpcCenterConfig centerConfig;
public NacosEasyRpcCenter(EasyRpcCenterConfig centerConfig, EasyRpcApplicationConfig rpcApplicationConfig) {
Assert.isTrue(StrUtil.isNotEmpty(centerConfig.getAddress()), "registry address cannot be empty");
Assert.isTrue(StrUtil.isNotEmpty(centerConfig.getGroup()), "registry group cannot be empty");
this.applicationConfig = rpcApplicationConfig;
this.centerConfig = centerConfig;
Properties properties = new Properties();
properties.setProperty("serverAddr", String.format("%s:%d", centerConfig.getAddress(), centerConfig.getPort()));
properties.setProperty("namespace", StrUtil.emptyToDefault(centerConfig.getNamespace(), DEFAULT_NAMESPACE));
try {
this.nacosNamingService = new NacosNamingService(properties);
} catch (NacosException e) {
log.error("Easy-Rpc -> nacos center init error:{}", e.getErrMsg(), e);
throw new EasyRpcRunException(e.getErrMsg());
}
}
@Override
public void registerInstance(ServiceInstance serviceInstance) {
Instance instance = new Instance();
instance.setIp(serviceInstance.getIp());
instance.setPort(serviceInstance.getPort());
Map<String, String> meteData=new HashMap<>(4);
meteData.put(DEFAULT_META_PARAM,JSONObject.toJSONString(serviceInstance.getMetaDataSet()));
instance.setMetadata(meteData);
try {
nacosNamingService.registerInstance(applicationConfig.getName(), centerConfig.getGroup(),instance);
} catch (NacosException e) {
log.error("Easy-Rpc -> nacos center register error:{}", e.getErrMsg(), e);
throw new EasyRpcRunException(e.getErrMsg());
}
log.info("Easy-Rpc -> nacos center register:[ serviceName:{} group:{} ] success",applicationConfig.getName(),centerConfig.getGroup());
}
@Override
public void deregisterInstance(ServiceInstance serviceInstance) {
try {
nacosNamingService.deregisterInstance(applicationConfig.getName(), centerConfig.getGroup(),serviceInstance.getIp(),serviceInstance.getPort());
} catch (NacosException e) {
log.error("Easy-Rpc -> nacos center deregister error:{}", e.getErrMsg(), e);
}
log.info("Easy-Rpc -> nacos center deregister:[ serviceName:{} group:{} ] success",applicationConfig.getName(),centerConfig.getGroup());
}
@Override
public List<ServiceInstance> getAllInstances(String serviceId) {
List<ServiceInstance> serviceInstanceList = EasyRpcInstanceCache.getServiceInstanceList(serviceId);
if(CollectionUtil.isEmpty(serviceInstanceList)){
serviceInstanceList = new CopyOnWriteArrayList<>(new ArrayList<>(8));
try {
List<Instance> allInstances = nacosNamingService.getAllInstances(serviceId, centerConfig.getGroup());
if(CollectionUtil.isNotEmpty(allInstances)){
for(Instance instance:allInstances){
serviceInstanceList.add(new ServiceInstance(instance.getIp(), instance.getPort()));
}
}
} catch (NacosException e) {
log.error("Easy-Rpc -> nacos center deregister error:{}", e.getErrMsg(), e);
}
EasyRpcInstanceCache.updateServiceInstanceInfo(serviceId,serviceInstanceList);
}
return serviceInstanceList;
}
@Override
public void subscribeInstance(String serviceId) {
try {
nacosNamingService.subscribe(serviceId, centerConfig.getGroup(), new AbstractEventListener() {
@Override
public Executor getExecutor() {
return ThreadPoolUtils.subscribeInstancePool;
}
@Override
public void onEvent(Event event) {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> allInstances = namingEvent.getInstances();
List<ServiceInstance> serviceInstanceList = new CopyOnWriteArrayList<>(new ArrayList<>(8));
if(CollectionUtil.isNotEmpty(allInstances)){
for(Instance instance:allInstances){
serviceInstanceList.add(new ServiceInstance(instance.getIp(), instance.getPort()));
}
}
// 直接把本地的全量替换
EasyRpcInstanceCache.updateServiceInstanceInfo(serviceId,serviceInstanceList);
}
});
} catch (NacosException e) {
log.error("Easy-Rpc -> nacos center subscribe error:{}", e.getErrMsg(), e);
}
log.info("Easy-Rpc -> nacos center subscribe:[{}] success",serviceId);
}
}
消费者相关
消费者不用想,凡是这种RPC的几乎都是动态代理,问题是用什么样的方法为它生成代理,我们以Feign为例看看它是怎么做的?
启动注解→扫描包下被注解标识的接口→获取封装信息→生成FeignClientFactoryBean注入容器
感兴趣的可以自己去看看,最终我们采用注解调用时,获取的无非是**factoryBean.getObject();**返回的动态代理对象罢了
想想这样合适吗?他需要扫描整个包然后生成代理对象再放入容器,要是这个对象压根没人用岂不是白生成了? 所以我这里改了一下,不再扫描包生成了,我在你属性注入时才生成!
自定义注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface EasyRpcServiceInvoke {
// 服务项ID
String serviceId();
// 服务发布的Bean 名称
String beanRefName() default "";
}
此注解代表我需要获取远程调用的代理对象,如:
消费者后置处理器
所以我要在Bean属性注入的时候,为它注入一个动态代理对象,同时添加需要订阅的服务项
这个服务项在Nacos中就是服务名,就好比feign也需要指定一个服务名是吧
为什么dubbo不需要?因为dubbo在nacos不是以服务为单位,而是以暴露的接口服务为单位,如下:
代理对象生成
这里采用了Cglib动态代理,为什么不用jdk动态代理,主要是为了避免反射耗时,也方便以后拓展的灵活性
代理工厂:
public class CglibInvokeBeanProxyFactory {
/**
* @Author colins
* @Description 获取客户端远程调用代理对象
* @return T
**/
public static <T> T getClientInvokeProxy(Class<T> interfaceClass, String serviceId, String beanRef,String interfaces) throws Exception {
return (T) Enhancer.create(interfaceClass, new CglibInvocationHandler(serviceId, beanRef,interfaces));
}
}
代理处理类:
public class CglibInvocationHandler implements InvocationHandler {
private final String serviceId;
private final String beanRef;
private final String interfaces;
public CglibInvocationHandler(String serviceId, String beanRef, String interfaces) {
this.serviceId = serviceId;
this.beanRef = beanRef;
this.interfaces = interfaces;
}
@Override
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
List<ServiceInstance> serviceInstanceList = EasyRpcInstanceCache.getServiceInstanceList(serviceId);
if (CollectionUtil.isEmpty(serviceInstanceList)) {
throw new EasyRpcException(String.format("[ %d ] No corresponding service found ", serviceId));
}
// 构建请求参数
EasyRpcRequest easyRpcRequest = new EasyRpcRequest(UUID.randomUUID().toString(),beanRef, interfaces, method.getName(), method.getParameterTypes(), objects);
// 获取会话
EasyRpcSession easyRpcSession = EasyRpcSessionFactory.getInstance().openSession(serviceId, easyRpcRequest, serviceInstanceList);
// 会话执行调用
return easyRpcSession.exec();
}
}
生产者相关
自定义注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
@Component
public @interface EasyRpcServicePublish {
// 暴露出去的bean名称 默认就是bean默认名称
String beanRefName() default "";
}
生产者后置处理器
主要两件事:
- 收集暴露出去的bean集合
- 收集需要暴露出去的元数据
为什么要收集暴露出去的服务bean?
仔细看一下上面的注解是一个复合注解,我暴露出去提供远程调用服务的对象在容器中不就是一个bean吗?所以如果有消费者调用暴露的服务,本质不就是调用远程容器中的bean对象吗? 所以我给它收集缓存起来,要是有别的服务远程调用过来,我直接走缓存反射执行方法就好了
容器相关
容器启动
需要做三件事,启动netty server 、发布服务、订阅服务
public class EasyRpcStartEvent implements ApplicationListener<ContextRefreshedEvent> {
private EasyRpcConfig rpcConfig;
private EasyRpcCenter rpcCenter;
public EasyRpcStartEvent(EasyRpcConfig rpcConfig, EasyRpcCenter rpcCenter) {
this.rpcConfig = rpcConfig;
this.rpcCenter = rpcCenter;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 启动NettyServer
ThreadPoolUtils.startNettyPool.execute(new EasyRpcServer(rpcConfig.getProtocol().getPort(), new EasyRpcServerHandlerInit()));
// 发布服务
rpcCenter.registerInstance(new ServiceInstance(rpcConfig.getProtocol().getPort(), EasyRpcSpringConstant.serviceMetaDataList));
// 订阅服务
EasyRpcSpringConstant.serviceIdList.forEach(item->{
rpcCenter.subscribeInstance(item);
});
}
}
容器关闭
只需要让该服务注册下线即可,如果想优雅一点,可以把Netty相关也全关闭(这里先不处理)
public class EasyRpcCloseEvent implements ApplicationListener<ContextClosedEvent> {
private EasyRpcConfig rpcConfig;
private EasyRpcCenter rpcCenter;
public EasyRpcCloseEvent(EasyRpcConfig rpcConfig, EasyRpcCenter rpcCenter) {
this.rpcConfig = rpcConfig;
this.rpcCenter = rpcCenter;
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
// 注销服务即可 其他中断无所谓
// netty server下线 client会断开重连
rpcCenter.deregisterInstance(new ServiceInstance(rpcConfig.getProtocol().getPort()));
}
}
调用相关
消费者
消费者在调用的时候本质是执行代理对象中的逻辑,也就是往netty管道中写入数据,不在多说,看看发送数据的实体类
生产者
生产者就是根据传来的信息,拿到缓存中对象的bean对象反射执行方法,然后返回
测试结果
消费端16线程,所有均为一台机器(生产、消费、jmeter都在一台机器)
测试结果仅供参考,在好的条件下,某些配置设置合适点,吞吐量应该更高
无传参 无返回 | POJO传参 简单返回 | 简单传参 POJO返回 | 无传参 POJO_list返回 | |
100并发 1000次请求 (共计10000) | 1W左右QPS | 9350左右QPS | 8900左右QPS | 7500左右QPS |
300并发 1000次请求 (共计30000) | 9200左右QPS | 8700左右QPS | 8000左右QPS | 6800左右QPS |
500并发 1000次请求 (共计50000) | 8500左右QPS | 7800左右QPS | 7800左右QPS | 6800左右QPS |
后续拓展
从以上可以看到很多东西都还没实现,只能说基本雏形搭完了,后续什么SPI机制、服务治理相关、多协议拓展、配置丰富化、多注册中心等等都是可以做的事,最后再看一下架构吧
个人博客: 全是干货,相信不会让你失望