文章目录
- Pre
- RPC服务实现
- 服务注册
- 请求处理
- 设计: 请求分发机制
Pre
Simple RPC - 01 框架原理及总体架构初探
Simple RPC - 02 通用高性能序列化和反序列化设计与实现
Simple RPC - 03 借助Netty实现异步网络通信
Simple RPC - 04 从零开始设计一个客户端(上)
Simple RPC - 05 从零开始设计一个客户端(下)_ 依赖倒置和SPI
RPC服务实现
服务端的RPC服务主要包含两个核心功能:
-
服务注册:将服务的实现类注册到RPC框架中。
-
请求处理:接收并处理来自客户端的RPC请求。
服务注册
服务注册通过一个Map<String, Object>
结构来实现,其中Key
为服务名,Value
为服务实现类的实例。
把服务的实现类注册到 RPC 框架中,这个逻辑的实现很简单,我们只要使用一个合适的数据结构,记录下所有注册的实例就可以了,后面在处理客户端请求的时候,会用到这个数据结构来查找服务实例
@Singleton
public class RpcRequestHandler implements RequestHandler, ServiceProviderRegistry {
private final Map<String, Object> serviceProviders = new ConcurrentHashMap<>();
@Override
public synchronized <T> void addServiceProvider(Class<? extends T> serviceClass, T serviceProvider) {
serviceProviders.put(serviceClass.getCanonicalName(), serviceProvider);
}
}
请求处理
首先来看服务端中,使用 Netty 接收所有请求数据的处理类 RequestInvocation
的 channelRead0
方法
/**
* 处理通道读取事件
*
* @param channelHandlerContext 上下文处理器,用于管理通道事件
* @param request 入站请求命令
* @throws Exception 如果没有找到处理请求的处理器或者其他异常
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Command request) throws Exception {
// 根据请求头的类型获取对应的请求处理器
RequestHandler handler = requestHandlerRegistry.get(request.getHeader().getType());
if(null != handler) {
// 处理请求并获取响应
Command response = handler.handle(request);
if(null != response) {
// 将响应写入并刷新通道上下文,并添加监听器处理写入结果
channelHandlerContext.writeAndFlush(response).addListener((ChannelFutureListener) channelFuture -> {
if (!channelFuture.isSuccess()) {
// 如果写入响应失败,记录日志并关闭通道
logger.warn("Write response failed!", channelFuture.cause());
channelHandlerContext.channel().close();
}
});
} else {
// 如果响应为空,记录警告日志
logger.warn("Response is null!");
}
} else {
// 如果没有找到处理请求的处理器,抛出异常
throw new Exception(String.format("No handler for request with type: %d!", request.getHeader().getType()));
}
}
根据请求命令的 Hdader 中的请求类型 type,去 requestHandlerRegistry 中查找对应的请求处理器 RequestHandler,然后调用请求处理器去处理请求,最后把结果发送给客户端
这种通过“请求中的类型”,把请求分发到对应的处理类或者处理方法的设计,在服务端处理请求的场景中,这是一个很常用的方法。我们这里使用的也是同样的设计,不同的是,我们使用了一个命令注册机制,让这个路由分发的过程省略了大量的 if-else 或者是 switch 代码。这样做的好处是,可以很方便地扩展命令处理器,而不用修改路由分发的方法,并且代码看起来更加优雅。
/**
* 请求处理器注册类,用于管理和获取不同的请求处理器
* 该类通过 Singleton 模式确保只有一个实例,
* 并在类加载时初始化所有已知的请求处理器
* @author artisan
*/
public class RequestHandlerRegistry {
private static final Logger logger = LoggerFactory.getLogger(RequestHandlerRegistry.class);
/**
* 存储请求处理器的映射,键为处理器类型,值为处理器实例
*/
private Map<Integer, RequestHandler> handlerMap = new HashMap<>();
/**
* Singleton 实例
*/
private static RequestHandlerRegistry instance = null;
/**
* 获取 RequestHandlerRegistry 的单例实例
* 如果实例不存在,则创建一个新的实例
*
* @return RequestHandlerRegistry 的单例实例
*/
public static RequestHandlerRegistry getInstance() {
if (null == instance) {
instance = new RequestHandlerRegistry();
}
return instance;
}
/**
* 私有构造方法,用于在实例创建时加载所有请求处理器
* 通过 ServiceSupport 加载所有实现 RequestHandler 接口的实例,
* 并将它们添加到 handlerMap 中
*/
private RequestHandlerRegistry() {
Collection<RequestHandler> requestHandlers = ServiceSupport.loadAll(RequestHandler.class);
for (RequestHandler requestHandler : requestHandlers) {
handlerMap.put(requestHandler.type(), requestHandler);
logger.info("Load request handler, type: {}, class: {}.", requestHandler.type(), requestHandler.getClass().getCanonicalName());
}
}
/**
* 根据请求类型获取对应的请求处理器
*
* @param type 请求处理器的类型
* @return 对应的请求处理器实例,如果不存在则返回 null
*/
public RequestHandler get(int type) {
return handlerMap.get(type);
}
}
接下来,我们看下通过RpcRequestHandler
处理具体的客户端RPC请求
/**
* 处理请求命令的方法
* 该方法的主要职责是解析请求命令,查找并调用相应服务,然后返回处理结果
*
* @param requestCommand 请求命令,包含服务调用信息和参数
* @return 返回命令,包含调用结果或错误信息
*/
@Override
public Command handle(Command requestCommand) {
Header header = requestCommand.getHeader();
// 从payload中反序列化RpcRequest,获取服务调用请求的具体信息
RpcRequest rpcRequest = SerializeSupport.parse(requestCommand.getPayload());
try {
// 根据rpcRequest中的服务名,查找已注册的服务提供方
Object serviceProvider = serviceProviders.get(rpcRequest.getInterfaceName());
if(serviceProvider != null) {
// 服务提供者存在,反序列化参数并获取具体方法进行调用
String arg = SerializeSupport.parse(rpcRequest.getSerializedArguments());
// 通过反射调用服务方法
Method method = serviceProvider.getClass().getMethod(rpcRequest.getMethodName(), String.class);
String result = (String ) method.invoke(serviceProvider, arg);
// 将调用结果序列化并封装成响应命令返回
return new Command(new ResponseHeader(type(), header.getVersion(), header.getRequestId()), SerializeSupport.serialize(result));
}
// 未找到对应的服务提供方,返回错误响应
logger.warn("No service Provider of {}#{}(String)!", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
return new Command(new ResponseHeader(type(), header.getVersion(), header.getRequestId(), Code.NO_PROVIDER.getCode(), "No provider!"), new byte[0]);
} catch (Throwable t) {
// 处理过程中发生异常,记录并返回错误响应
logger.warn("Exception: ", t);
return new Command(new ResponseHeader(type(), header.getVersion(), header.getRequestId(), Code.UNKNOWN_ERROR.getCode(), t.getMessage()), new byte[0]);
}
}
核心流程如下:
- 把
requestCommand
的payload
属性反序列化成为RpcRequest
; - 根据
rpcRequest
中的服务名,去成员变量serviceProviders
中查找已注册服务实现类的实例; - 找到服务提供者之后,利用 Java 反射机制调用服务的对应方法;
- 把结果封装成响应命令并返回,在
RequestInvocation
中,它会把这个响应命令发送给客户端。
再来看成员变量 serviceProviders
,它的定义是:Map<String/service name/, Object/service provider/> serviceProviders
。
它实际上就是一个 Map,Key 就是服务名,Value 就是服务提供方,也就是服务实现类的实例。这个 Map 的数据从哪儿来的呢?
我们来看一下 RpcRequestHandler
这个类的定义
@Singleton
public class RpcRequestHandler implements RequestHandler, ServiceProviderRegistry {
/**
* 同步地向服务提供者集合中添加一个新的服务提供者
* 此方法为类的公共接口一部分,提供了一种向内部服务提供者映射添加新服务提供者的方式
*
* @param serviceClass 服务接口类,指定了服务提供者的类型期望
* @param serviceProvider 实际的服务提供者实例,实现了指定的服务接口
*
* 选择使用同步方法以确保线程安全,因为在多线程环境中,
* 可能会有多个线程尝试同时向服务提供者集合中添加服务
*
* 使用服务提供者的类的规范名称(canonical name)作为键进行存储,是为了确保
* 通过类名唯一标识服务提供者,避免了由于类加载器差异导致的潜在问题
*
* 记录日志是为了在系统运行时能够追踪到服务提供者的添加操作,有助于调试和系统维护
*/
@Override
public synchronized <T> void addServiceProvider(Class<? extends T> serviceClass, T serviceProvider) {
serviceProviders.put(serviceClass.getCanonicalName(), serviceProvider);
logger.info("Add service: {}, provider: {}.",
serviceClass.getCanonicalName(),
serviceProvider.getClass().getCanonicalName());
}
}
这个类不仅实现了处理客户端请求的 RequestHandler
接口,同时还实现了注册 RPC 服务 ServiceProviderRegistry
接口,也就是说,RPC 框架服务端需要实现的两个功能——注册 RPC 服务和处理客户端 RPC 请求,都是在这一个类 RpcRequestHandler
中实现的,所以说,这个类是这个 RPC 框架服务端最核心的部分。
成员变量 serviceProviders
这个 Map 中的数据,也就是在 addServiceProvider
这个方法的实现中添加进去的
@Singleton
RpcRequestHandler
上增加了一个注解 @Singleton
,限定这个类它是一个单例模式,这样确保在进程中任何一个地方,无论通过 ServiceSupport
获取 RequestHandler
或者 ServiceProviderRegistry
这两个接口的实现类,拿到的都是 RpcRequestHandler
这个类的唯一的一个实例。 实现如下
/**
* 提供服务加载功能的支持类,特别是处理单例服务
* @author artisan
*/
public class ServiceSupport {
/**
* 存储单例服务的映射,确保每个服务只有一个实例
*/
private final static Map<String, Object> singletonServices = new HashMap<>();
/**
* 加载单例服务实例
*
* @param service 服务类的Class对象
* @param <S> 服务类的类型参数
* @return 单例服务实例
* @throws ServiceLoadException 如果找不到服务实例
*/
public synchronized static <S> S load(Class<S> service) {
return StreamSupport.
stream(ServiceLoader.load(service).spliterator(), false)
.map(ServiceSupport::singletonFilter)
.findFirst().orElseThrow(ServiceLoadException::new);
}
/**
* 加载所有服务实例
*
* @param service 服务类的Class对象
* @param <S> 服务类的类型参数
* @return 所有服务实例的集合
*/
public synchronized static <S> Collection<S> loadAll(Class<S> service) {
return StreamSupport.
stream(ServiceLoader.load(service).spliterator(), false)
.map(ServiceSupport::singletonFilter).collect(Collectors.toList());
}
/**
* 对服务实例进行单例过滤
*
* @param service 服务实例
* @param <S> 服务类的类型参数
* @return 单例过滤后的服务实例,如果该服务是单例的并且已有实例存在,则返回已存在的实例
*/
@SuppressWarnings("unchecked")
private static <S> S singletonFilter(S service) {
if(service.getClass().isAnnotationPresent(Singleton.class)) {
String className = service.getClass().getCanonicalName();
Object singletonInstance = singletonServices.putIfAbsent(className, service);
return singletonInstance == null ? service : (S) singletonInstance;
} else {
return service;
}
}
}
设计: 请求分发机制
请求分发分为两个层次:
-
网络传输层:通过
RequestInvocation
类,根据请求类型分发到对应的请求处理器。根据请求命令中的请求类型 (command.getHeader().getType()),分发到对应的请求处理器 RequestHandler 中
-
业务逻辑层:通过
RpcRequestHandler
类,根据服务名分发到具体的服务实现类。根据 RPC 请求中的服务名,把 RPC 请求分发到对应的服务实现类的实例中去
这种分层设计的目的在于保持系统的松耦合和高内聚,确保网络传输与业务逻辑的清晰分离。