手撕RPC——实现简单的RPC调用
- 一、场景设计
- 二、设计思路
- 2.1 客户端的设计
- 2.2 服务端的设计
- 2.3 通信设计
- 三、代码实现
- 3.1 定义用户信息
- 3.2 用户服务接口
- 3.3 用户服务接口实现
- 3.4 定义消息格式
- 3.5 实现动态代理类
- 3.6 封装信息传输类
- 3.7 定义服务端Server接口
- 3.8 实现RpcServer接口
- 3.9 实现WorkThread类
- 3.10 实现本地服务存放器
- 3.11 客户端主程序
- 3.12 服务端主程序
一、场景设计
现在A,B位于不同的服务器上,但现在A想调用B的某个方法,如何实现呢?
服务端B:
有一个用户表
- UserService 里有一个功能:getUserByUserId(Integer id)
- UserServiceImpl 实现了UserService接口和方法
客户端A:
调用getUserByUserId方法, 内部传一个Id给服务端,服务端查询到User对象返回给客户端
如何实现以上调用过程呢?
二、设计思路
主要考虑客户端、服务端、以及双方如何通信才能实现此功能
2.1 客户端的设计
- 调用getUserByUserId方法时,内部将调用信息处理后发送给服务端B,告诉B我要获取User
- 外部调用方法,内部进行其它的处理——这种场景我们可以使用动态代理的方式,改写原本方法的处理逻辑
2.2 服务端的设计
- 监听到A的请求后,接收A的调用信息,并根据信息得到A想调用的服务与方法
- 根据信息找到对应的服务,进行调用后将结果发送回给A
2.3 通信设计
- 使用Java的socket网络编程进行通信
- 为了方便A ,B之间 对接收的消息进行处理,我们需要将请求信息和返回信息封装成统一的消息格式
三、代码实现
在此部分我们将理论转化为代码,分别实现客户端和服务端。
项目目录结构
3.1 定义用户信息
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}
3.2 用户服务接口
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}
3.3 用户服务接口实现
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}
3.4 定义消息格式
//定义请求信息格式RpcRequest
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名,客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}
//定义返回信息格式RpcResponse(类似http格式)
@Data
@Builder
public class RpcResponse implements Serializable {
//状态码
private int code;
//状态信息
private String message;
//具体数据
private Object data;
//构造成功信息
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).data(data).build();
}
//构造失败信息
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}
3.5 实现动态代理类
@AllArgsConstructor
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象,反射封装成一个request
private String host;
private int port;
//jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端)
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//IOClient.sendRequest 和服务端进行数据传输
RpcResponse response= IOClient.sendRequest(host,port,request);
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}
3.6 封装信息传输类
public class IOClient {
//这里负责底层与服务端的通信,发送request,返回response
public static RpcResponse sendRequest(String host, int port, RpcRequest request){
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
oos.writeObject(request);
oos.flush();
RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}
3.7 定义服务端Server接口
public interface RpcServer {
//开启监听
void start(int port);
void stop();
}
3.8 实现RpcServer接口
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
//如果没有连接,会堵塞在这里
Socket socket = serverSocket.accept();
//有连接,创建一个新的线程执行处理
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
}
}
3.9 实现WorkThread类
WorkThread类负责启动线程和客户端进行数据传输,WorkThread类中的getResponse方法负责解析收到的request信息,寻找服务进行调用并返回结果。
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}
3.10 实现本地服务存放器
因为一个服务器会有多个服务,所以需要设置一个本地服务存放器serviceProvider存放服务,在接收到服务端的request信息之后,我们在本地服务存放器找到需要的服务,通过反射调用方法,得到结果并返回
//本地服务存放器
public class ServiceProvider {
//集合中存放服务的实例
private Map<String,Object> interfaceProvider;
public ServiceProvider(){
this.interfaceProvider=new HashMap<>();
}
//本地注册服务
public void provideServiceInterface(Object service){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();
for (Class<?> clazz:interfaceName){
interfaceProvider.put(clazz.getName(),service);
}
}
//获取服务实例
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}
3.11 客户端主程序
public class TestClient {
public static void main(String[] args) {
ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999);
UserService proxy=clientProxy.getProxy(UserService.class);
User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());
User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}
3.12 服务端主程序
public class TestServer {
public static void main(String[] args) {
UserService userService=new UserServiceImpl();
ServiceProvider serviceProvider=new ServiceProvider();
serviceProvider.provideServiceInterface(userService);
RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}