本文跟着MY-RPC-FRamework的代码,根据自己的理解做的笔记,先理解,再学习。
RPC原理
客户端和服务端都可以访问到通用的接口,但是只有服务端有这个接口的实现类,客户端调用这个接口的方式,是通过网络传输,告诉服务端我要调用这个接口,服务端收到之后找到这个接口的实现类,并且执行,将执行的结果返回给客户端,作为客户端调用接口方法的返回值。
简单实现
1. 通用接口
这个接口就是客户端需要调用的接口,也是服务端需要实现的接口,这里只定义了一个接口 hello。
public interface HelloService {
String hello(HelloObject object);
}
@Data
@AllArgsConstructor
public class HelloObject implements Serializable {
private Integer id;
private String message;
}
注意这个HelloObject 对象需要实现Serializable接口,因为它需要在调用过程中从客户端传递给服务端。实现序列化的方式很多,这里使用了JDK自带的序列化方式,后期可以改进。
有了这个公共接口,就可以先在服务端将其实现
public class HelloServiceImpl implements HelloService {
private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
@Override
public String hello(HelloObject object) {
logger.info("接收到:{}", object.getMessage());
return "这是掉用的返回值,id=" + object.getId();
}
}
传输协议
现在,思考客户端需要传输哪些信息给服务端,服务端才能知道调用哪个接口的哪个方法呢?
- 接口的名字
- 方法的名字
- 方法的参数
- 每个参数的类型
服务端知道以上四个条件,就可以找到这个方法并且调用了。我们把这四个条件写到一个对象里,到时候传输时传输这个对象就行了
@Data
@Builder
public class RpcRequest implements Serializable {
/**
* 待调用接口名称
*/
private String interfaceName;
/**
* 待调用方法名称
*/
private String methodName;
/**
* 调用方法的参数
*/
private Object[] parameters;
/**
* 调用方法的参数类型
*/
private Class<?>[] paramTypes;
}
服务器调用完这个方法后,需要给客户端返回哪些信息呢?如果调用成功的话,显然需要返回值,如果调用失败了,就需要失败的信息,这里封装成一个RpcResponse对象:
@Data
public class RpcResponse<T> implements Serializable {
/**
* 响应状态码
*/
private Integer statusCode;
/**
* 响应状态补充信息
*/
private String message;
/**
* 响应数据
*/
private T data;
public static <T> RpcResponse<T> success(T data) {
RpcResponse<T> response = new RpcResponse<>();
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}
public static <T> RpcResponse<T> fail(ResponseCode code) {
RpcResponse<T> response = new RpcResponse<>();
response.setStatusCode(code.getCode());
response.setMessage(code.getMessage());
return response;
}
}
客户端实现–动态代理
需要注意,前面我们提过,rpc客户端是没有接口的实现的,只有接口HelloService ,没有办法生成实例对象。这时,我们可以通过动态代理的方式生成实例,并且调用方法(代理方法invoke)时生成需要的RpcRequest对象并且发送给服务端。
传递host和port来指明服务端的位置。并且使用getProxy()方法来生成代理对象
public class RpcClientProxy implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class);
private String host;
private int port;
public RpcClientProxy(String host, int port) {
this.host = host;
this.port = port;
}
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
logger.info("调用方法: {}#{}", method.getDeclaringClass().getName(), method.getName());
RpcRequest rpcRequest = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameters(args)
.paramTypes(method.getParameterTypes())
.build();
RpcClient rpcClient = new RpcClient();
return rpcClient.sendRequest(rpcRequest, host, port);
}
}
发送的逻辑使用了一个RpcClient对象来实现,这个对象的作用,就是将一个对象发过去,并且接受返回的对象。
public class RpcClient {
private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
try (Socket socket = new Socket(host, port)) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
objectOutputStream.writeObject(rpcRequest);
objectOutputStream.flush();
RpcResponse rpcResponse = (RpcResponse) objectInputStream.readObject();
if(rpcResponse == null) {
logger.error("服务调用失败,service:{}", rpcRequest.getInterfaceName());
throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, " service:" + rpcRequest.getInterfaceName());
}
if(rpcResponse.getStatusCode() == null || rpcResponse.getStatusCode() != ResponseCode.SUCCESS.getCode()) {
logger.error("调用服务失败, service: {}, response:{}", rpcRequest.getInterfaceName(), rpcResponse);
throw new RpcException(RpcError.SERVICE_INVOCATION_FAILURE, " service:" + rpcRequest.getInterfaceName());
}
return rpcResponse.getData();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用时有错误发生:", e);
throw new RpcException("服务调用失败: ", e);
}
}
}
服务端的实现——反射调用
服务端使用一个ServerSocket监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程中处理调用。这里创建线程采用线程池:
public class RpcServer {
private final ExecutorService threadPool;
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
public RpcServer() {
int corePoolSize = 5;
int maximumPoolSize = 50;
long keepAliveTime = 60;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
}
}
RpcServer暂时只能注册一个接口,即对外提供一个接口的调用服务,添加register方法,在注册完一个服务后立刻开始监听
public void register(Object service, int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服务器正在启动...");
Socket socket;
while((socket = serverSocket.accept()) != null) {
logger.info("客户端连接!Ip为:" + socket.getInetAddress());
threadPool.execute(new WorkerThread(socket, service));
}
} catch (IOException e) {
logger.error("连接时有错误发生:", e);
}
}
WorkerThread实现了Runnable接口,用于接收RpcRequest对象,解析并且调用,生成RpcResponse对象并传输回去
@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
Object returnObject = method.invoke(service, rpcRequest.getParameters());
objectOutputStream.writeObject(RpcResponse.success(returnObject));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
logger.error("调用或发送时有错误发生:", e);
}
}
测试
服务端侧,我们已经在上面实现了一个HelloService的实现类HelloServiceImpl的实现类了,我们只需要创建一个RpcServer并且把这个实现类注册进去就行了
public class TestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
RpcServer rpcServer = new RpcServer();
rpcServer.register(helloService, 9000);
}
}`
客户端方面,我们需要通过动态代理,生成代理对象,并且调用,动态代理会自动帮我们向服务端发送请求的
public class TestClient {
public static void main(String[] args) {
RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
HelloService helloService = proxy.getProxy(HelloService.class);
HelloObject object = new HelloObject(12, "This is a message");
String res = helloService.hello(object);
System.out.println(res);
}
}
改进,v1.0版本下载之后就是改进的版本
改进点:
- 核心就是支持注册多个接口,对应ServiceRegistry接口,使用Map保存已经注册的服务。
- RequestHandler:真实调用服务的类,并将结果返回
- RequestHandlerThread:和前面的WorkerThread类似。