大纲
1.RPC的相关概念
2.RPC服务调用端动态代理实现
3.Netty客户端之RPC远程调用过程分析
4.RPC网络通信中的编码解码器
5.Netty服务端之RPC服务提供端的处理
6.RPC服务调用端实现超时功能
5.Netty服务端之RPC服务提供端的处理
(1)RPC服务提供端NettyServer
(2)基于反射调用请求对象的目标方法
(1)RPC服务提供端NettyRpcServer
public class ServiceConfig {
private String serviceName;//调用方的服务名称
private Class serviceInterfaceClass;//服务接口类型
private Class serviceClass;
...
}
public class NettyRpcServer {
private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);
private static final int DEFAULT_PORT = 8998;
private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();
private int port;
public NettyRpcServer(int port) {
this.port = port;
}
public void start() {
logger.info("Netty RPC Server Starting...");
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new RpcDecoder(RpcRequest.class))
.addLast(new RpcEncoder(RpcResponse.class))
.addLast(new NettyRpcServerHandler(serviceConfigs));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
//到这一步为止,server启动了而且监听指定的端口号了
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
logger.info("Netty RPC Server started successfully, listened[" + port + "]");
//进入一个阻塞的状态,同步一直等待到你的server端要关闭掉
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("Netty RPC Server failed to start, listened[" + port + "]");
} finally {
bossEventLoopGroup.shutdownGracefully();
workerEventLoopGroup.shutdownGracefully();
}
}
//可以代理多个服务
public void addServiceConfig(ServiceConfig serviceConfig) {
this.serviceConfigs.add(serviceConfig);
}
public static void main(String[] args) {
ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);
NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);
nettyRpcServer.addServiceConfig(serviceConfig);
nettyRpcServer.start();
}
}
(2)基于反射调用请求对象的目标方法
//RpcRequest类需要修改字段调整为如下所示
public class RpcRequest implements Serializable {
private String requestId;
private String className;
private String methodName;
private Class[] parameterTypes;//参数类型
private Object[] args;//参数值
private String invokerApplicationName;//调用方的服务名称
private String invokerIp;//调用方的IP地址
...
}
public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);
private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();
public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {
for (ServiceConfig serviceConfig : serviceConfigs) {
String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();
serviceConfigMap.put(serviceInterfaceClass, serviceConfig);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest rpcRequest = (RpcRequest)msg;
logger.info("Netty RPC Server receives the request: " + rpcRequest);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
try {
//此时我们要实现什么呢?
//我们需要根据RpcRequest指定的class,获取到这个class
//然后通过反射构建这个class对象实例
//接着通过反射获取到这个RpcRequest指定方法和入参类型的method
//最后通过反射调用,传入方法,拿到返回值
//根据接口名字拿到接口实现类的名字后再获取类
ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());
Class clazz = serviceConfig.getServiceClass();
Object instance = clazz.newInstance();
Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(instance, rpcRequest.getArgs());
//把rpc调用结果封装到响应里去
rpcResponse.setResult(result);
rpcResponse.setSuccess(RpcResponse.SUCCESS);
} catch(Exception e) {
logger.error("Netty RPC Server failed to response the request.", e);
rpcResponse.setSuccess(RpcResponse.FAILURE);
rpcResponse.setException(e);
}
ctx.write(rpcResponse);
ctx.flush();
logger.info("send RPC response to client: " + rpcResponse);
}
}
6.RPC服务调用端实现超时功能
public class ReferenceConfig {
private static final long DEFAULT_TIMEOUT = 5000;
private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";
private static final int DEFAULT_SERVICE_PORT = 8998;
private Class serviceInterfaceClass;
private String serviceHost;
private int servicePort;
private long timeout;
...
}
public class NettyRpcClient {
private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);
private ReferenceConfig referenceConfig;
private ChannelFuture channelFuture;
private NettyRpcClientHandler nettyRpcClientHandler;
public NettyRpcClient(ReferenceConfig referenceConfig) {
this.referenceConfig = referenceConfig;
this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());
}
public void connect() {
logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new RpcEncoder(RpcRequest.class))
.addLast(new RpcDecoder(RpcResponse.class))
.addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout()))
.addLast(nettyRpcClientHandler);
}
});
try {
if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {
channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();
logger.info("successfully connected.");
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
//标记一下请求发起的时间
NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());
channelFuture.channel().writeAndFlush(rpcRequest).sync();
RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());
logger.info("receives response from netty rpc server.");
if (rpcResponse.isSuccess()) {
return rpcResponse;
}
throw rpcResponse.getException();
}
}
public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);
private long timeout;
public NettyRpcReadTimeoutHandler(long timeout) {
this.timeout = timeout;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse = (RpcResponse)msg;
long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());
long now = new Date().getTime();
if (now - requestTime >= timeout) {
rpcResponse.setTimeout(true);
logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);
}
//移除发起请求时间的标记
NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());
ctx.fireChannelRead(rpcResponse);
}
}
public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);
private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;
private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();
private long timeout;
public NettyRpcClientHandler(long timeout) {
this.timeout = timeout;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse = (RpcResponse) msg;
if (rpcResponse.getTimeout()) {
logger.error("Netty RPC client receives the response timeout: " + rpcResponse);
} else {
rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);
logger.info("Netty RPC client receives the response: " + rpcResponse);
}
}
public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {
long waitStartTime = new Date().getTime();
while (rpcResponses.get(requestId) == null) {
try {
long now = new Date().getTime();
if (now - waitStartTime >= timeout) {
break;
}
Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);
} catch (InterruptedException e) {
logger.error("wait for response interrupted", e);
}
}
RpcResponse rpcResponse = rpcResponses.get(requestId);
if (rpcResponse == null) {
logger.error("Get RPC response timeout.");
throw new NettyRpcReadTimeoutException("Get RPC response timeout.");
} else {
rpcResponses.remove(requestId);
}
return rpcResponse;
}
}