什么是RPC
在分布式服务框架中,在java领域中有很多实现远程通讯的技术,例如RMI、Hessian、SOAP、ESB 和JMS。
要实现网路机器间的通讯,首先需要看看计算机网络通信的基本原来,在底层层面看,网络通信需要做得就是将流从一台计算机传输到另一台计算机,基于传输协议和网络IO来实现,其中传输协议比较出名的有TCP、UDP ,而TCP 和UDP 都是基于Socket概念上为某类应用场景而扩展出的传输协议,网络IO主要有BIO、NIO、AIO三种方式,所有的分布式应用通讯都基于这个原理而实现。
RPC 全称remote procedure call,即远程过程调用。借助RPC 可以做到像本地调用一样调用远程服务,是一种进程间的通信方式。RPC 并不是一种具体的技术,而是指整个网路远程调用过程。
一个完整的RPC框架里面包含了四个核心组件,分别是Client、Client Stub、Server和Server Stub。 这个Stub可以理解为存根。
- 客户端(Client): 服务的调用方;
- 客户端存根(Client Stub): 存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方;
- 服务端(Server):真正的服务提供者;
- 服务端存根(Server Stub):接收客户端发送过来的消息,将消息解包,并调用本地的方法。
RPC 框架调用流程如下:
(1)客户端(Client)以本地调用方式(即以接口的方式)调用服务;
(2)客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体 (将消息体对象序列化为二进制);
(3)客户端通过socket将消息发送到服务端;
(4)服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
(5)服务端村更(server stub)根据解码结果调用本地的服务;
(6)服务处理;
(7)本地服务执行并将结果返回给服务端存根(server stub);
(8)服务端存根(server stub) 将返回结果打包成消息(将结果消息对象序列化)
(9)服务端(server)通过socket 将消息发送到客户端;
(10)客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化)
(11)客户端(client)得到最终结果。
什么是RMI
java RMI ,即远程方法调用(Remote Method Invocation),一种用于实现远程过程调用(RPC-Remote procedure call)的java API,能直接传输序列化后的java对象。它的实现依赖于java虚拟机,因为它仅支持从一个JVM 到另一个JVM的调用。
具体流程如下:
1、客户端从远程服务器的注册表中查询并获取远程对象引用;
2、桩对象与远程对象具有相同的接口和方法列表,当客户端调用远程对象时 ,实际上是由相应的桩对象代理完成的;
3、远程引用层在将桩的本地引用转换为服务器上对象的远程引用后,再将调用传输层(Transport),由传输层通过TCP协议发送调用;
4、在服务器端,传输层监听入站连接,它一旦接收到客户端远程调用后,就将这个引用转发给其上 层的远程引用层;
5、服务器端的远程引用层将客户端发送的远程应用转换为本地虚拟机的引用后,再将请求传递给骨架(Skeleton);
6、骨架读取参数,又将请求传递给服务器,最后由服务 器进行实际的方法调用;
7、如果远程方法调用后有返回值,则服务器将这些结果又沿着“骨架->远程引用层->传输层”向下传递;
8、客户端的传输层接收到返回值后,又沿着“传输层->远程引用层->桩”向上传递,然后由桩来反序 列化这些返回值,并将最终的结果传递给客户端程序。
一个简单的RMI 实现需求如下:
(1)服务端提供根据ID查询用户的方法;
(2)客户端调用服务端方法,并返回用户对象;
(3)要求使用RMI 进行远程调用;
具体实现如下:
(1)首先,创建一个远程接口,该接口继承自java.rmi.Remote 接口,并声明需要远程调用方法,
package RMI;
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface IUserService extends Remote {
User getById(int id) throws RemoteException;
}
(2)然后,创建一个实现远程调用接口的类,
package RMI;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.Map;
public class UserServiceImpl extends UnicastRemoteObject implements IUserService {
Map<Object,User> userMap = new HashMap<>();
protected UserServiceImpl() throws RemoteException {
super();
User user1 = new User(1,"张三");
User user2 = new User(2,"李四");
userMap.put(user1.getId(),user1);
userMap.put(user2.getId(),user2);
}
@Override
public User getById(int id) throws RemoteException {
return userMap.get(id);
}
}
实体类User代码如下:
package RMI;
import java.io.Serializable;
public class User implements Serializable {
private static final long serialVersionUID = -7188208721293891735L;
private int id;
private String name;
public User(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
需要注意的是, 实体类一定需要序列化。
(3)接下来,创建一个服务器端程序,用于注册远程对象,
package RMI;
import java.rmi.AlreadyBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
public class RMIServer {
public static void main(String[] args) {
try {
//1.注册Registry实例,绑定端口
Registry registry = LocateRegistry.createRegistry(9998);
//2.创建远程对象
IUserService userService = new UserServiceImpl();
//3。将远程对象注册到RMI服务器上(即服务端注册表上)
registry.bind("userService", userService);
System.out.println("====RMI服务端启动成功======");
} catch (RemoteException | AlreadyBoundException e) {
e.printStackTrace();
}
}
}
(4)最后,创建一个客户端程序,用于调用远程对象的方法,
package RMI;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
public class RMIClient {
public static void main(String[] args) throws RemoteException, NotBoundException {
//1.获取Registry实例
Registry registry = LocateRegistry.getRegistry("localhost",9998);
//2.通过Registry 实例查找远程对象
IUserService userService = (IUserService) registry.lookup("userService");
User user = userService.getById(2);
System.out.println(user.getId()+"--------"+user.getName());
}
}
运行服务器端程序,然后运行客户端程序。客户端程序将调用服务器端的getById()方法,并打印返回的结果。
基于netty实现RPC框架
用netty实现一个简单的RPC框架,消费者和提供者约定接口的协议,消费者远程调用提供者的服务。
(1) 在pom.xml 中引入maven
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>1.18.8 </version>
</dependency>
<!-- 高性能 nio 框架依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>Hoxton.SR1 </version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version> Hoxton.SR1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final </version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version> 2.8.5</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version> </version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version> 2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
(2)创建一个rpc-api 项目,主要内容为实体类pojo、定义服务接口、以及公共类。
实体类User:
package netty_rmi;
import lombok.Data;
import java.io.Serializable;
@Data
public class User {
private int id;
private String name;
public User(int id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
公共接口类IUserService:
package netty_rmi;
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface IUserService {
User getById(int id) ;
}
请求头封装类:
package netty_rmi;
import lombok.Data;
/**
* 封装的请求对象
*/
@Data
public class RpcRequest {
/**
* 请求对象的ID
*/
private String requestId;
/**
* 类名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 参数类型
*/
private Class<?>[] parmeterTypes;
/**
* 入参
*/
private Object[] parameters;
}
请求返回封装类:
package netty_rmi;
import lombok.Data;
@Data
public class RpcResponse {
/**
* 响应id
*/
private String requestId;
/**
* 错误信息
*/
private String error;
/**
* 返回的结果
*/
private Object result;
}
(3)创建服务提供者rpc-provider。
对外接口类的实现类UserServiceImpl 如下:
package netty_rmi;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@RpcService
@Service
public class UserServiceImpl implements IUserService {
Map<Object, User> userMap = new HashMap<>();
@Override
public User getById(int id) {
if (userMap.size() == 0) {
User user1 = new User(1, "张三");
User user2 = new User(2, "李四");
userMap.put(user1.getId(), user1);
userMap.put(user2.getId(), user2);
}
return userMap.get(id);
}
}
注解类RpcService如下:
package netty_rmi;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 对外暴露服务接口
*/
@Target(ElementType.TYPE) //用于接口和类上
@Retention(RetentionPolicy.RUNTIME) //在运行时可以获取到
public @interface RpcService {
}
服务Netty启动类RpcServer:
package netty_rmi;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 启动类
*/
@Service
public class RpcServer implements DisposableBean {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
@Autowired
RpcServerHandler rpcServerHandler;
public void startServer(String ip, int port) {
try {
//1.创建线程组
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup();
//2.创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3.设置参数
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加String 的编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//业务处理类
pipeline.addLast(rpcServerHandler);
}
});
// 4.绑定端口
ChannelFuture sync = serverBootstrap.bind(ip,port).sync();
System.out.println("======= 服务端启动成功========");
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (bossGroup != null){
bossGroup.shutdownGracefully();
}
if (workGroup != null){
workGroup.shutdownGracefully();
}
}
}
@Override
public void destroy() throws Exception {
if (bossGroup != null){
bossGroup.shutdownGracefully();
}
if (workGroup != null){
workGroup.shutdownGracefully();
}
}
}
服务业务处理类RpcServerHandler:
package netty_rmi;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 服务端业务处理类
* 1、将标有@RpcService 注解的bean缓存
* 2、接收客户端请求
* 3、根据传递过来的beanName 从缓存中查找到对应的bean
* 4、解析请求中的方法名称,参数类型 参数信息
* 5、反射调用bean的方法
* 6、给客户端进行响应
*/
@Component
@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {
private static final Map SERVCIE_INSTANCE_MAP = new ConcurrentHashMap<>();
/**
* 通道读取就绪事件
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
//1.接收客户端请求-将msg转化RPPCRequest对象
RpcRequest rpcRequest = JSON.parseObject( (String) msg,RpcRequest.class);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
try {
//业务处理
rpcResponse.setResult(handler(rpcRequest));
}catch (Exception exception){
exception.printStackTrace();
rpcResponse.setError(exception.getMessage());
}
//6.给客户端进行响应
channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
}
/**
* 业务处理逻辑
* @param rpcRequest
* @return
*/
private Object handler(RpcRequest rpcRequest) throws InvocationTargetException {
//3.根据传递过来的beanName 从缓存中查找对应的bean
Object serviceBean = SERVCIE_INSTANCE_MAP.get(rpcRequest.getClassName());
if (serviceBean == null){
throw new RuntimeException("根据beanName找不到服务,beanName:"+rpcRequest.getClassName());
}
//4.解析请求中的方法名称,参数类型 参数信息
Class<?> serviceBeanClass = serviceBean.getClass();
String methodName = rpcRequest.getMethodName();
Class<?>[] parameterTypes = rpcRequest.getParmeterTypes();
Object[] parameters = rpcRequest.getParameters();
//5.反射调用的bean反复-CGLIB 反射调用
FastClass fastClass = FastClass.create(serviceBeanClass);
FastMethod method = fastClass.getMethod(methodName,parameterTypes);
return method.invoke(serviceBean,parameters);
}
/**
* 将标有@RPCService 注解的bean缓存
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String ,Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if (serviceMap != null && serviceMap.size()>0){
Set<Map.Entry<String,Object>> entries = serviceMap.entrySet();
for (Map.Entry<String,Object> item : entries){
Object serviceBean = item.getValue();
if (serviceBean.getClass().getInterfaces().length == 0){
throw new RuntimeException("服务必须实现接口");
}
//默认取第一个接口作为缓存bean的名称
String name = serviceBean.getClass().getInterfaces()[0].getName();
SERVCIE_INSTANCE_MAP.put(name,serviceBean);
}
}
}
}
启动类ServerBootstrap
package netty_rmi;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
RpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
rpcServer.startServer("127.0.0.1", 8899);
}
}).start();
}
}
(4) 创建服务消费者rpc-consumer
客户端Netty启动类:
package netty_rmi;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import netty_rmi.RpcClientHandler;
import java.util.concurrent.*;
/**
* 客户端:
* 1、连接Netty 服务端
* 2、提供给调用者主动关闭资源的方法
* 3、提供消息发送的方法
*/
public class RpcClient {
private EventLoopGroup group;
private Channel channel;
private String ip;
private int port;
private RpcClientHandler rpcClientHandler = new RpcClientHandler();
private ExecutorService executorService = Executors.newCachedThreadPool();
public RpcClient(String ip, int port) {
this.ip = ip;
this.port = port;
initClient();
}
/**
* 初始化方法--连接Netty服务端
*/
private void initClient() {
try {
//1.创建线程组
group = new NioEventLoopGroup();
//2.创建启动助手
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(rpcClientHandler);
}
});
//4.连接netty 服务端
channel = bootstrap.connect(ip,port).sync().channel();
} catch (InterruptedException e) {
e.printStackTrace();
if (channel != null){
channel.close();
}
if (group != null){
group.shutdownGracefully();
}
}
}
/**
* 提供给调用者主动关闭资源的方法
*/
public void close() {
if (channel != null){
channel.close();
}
if (group != null){
group.shutdownGracefully();
}
}
/**
* 提供消息发送
* @param msg 消息内容
* @return
*/
public Object send(String msg) throws ExecutionException, InterruptedException {
rpcClientHandler.setReqeustMsg(msg);
Future submit =executorService.submit(rpcClientHandler);
return submit.get();
}
}
客户端业务处理类:
package netty_rmi;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.Callable;
/**
* 客户端处理类
* 1、发送消息
* 2、接收消息
*/
public class RpcClientHandler extends SimpleChannelInboundHandler<String> implements Callable {
ChannelHandlerContext context;
//发送的消息
String reqeustMsg;
//服务端的消息
String responseMsg;
public void setReqeustMsg(String reqeustMsg){
this.reqeustMsg = reqeustMsg;
}
@Override
protected synchronized void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
responseMsg = msg;
//唤醒等待的线程
notify();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
@Override
public synchronized Object call() throws Exception{
//发送消息
context.writeAndFlush(reqeustMsg);
//线程等待
wait();
return responseMsg;
}
}
RPC代理类
package netty_rmi;
import com.alibaba.fastjson.JSON;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
/**
* 客户端代理类-创建代理对象
* 1、封装request请求对象
* 2、创建RPcClient对象
* 3、发送消息
* 4、返回结果
*/
public class RpcClientProxy {
public static Object createProxy(Class serviceClass){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1.封装request 请求对象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParmeterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
//2.创建RPCClient对象
RpcClient rpcClient = new RpcClient("localhost",8899) ;
try {
//3.发送消息
Object responseMsg = rpcClient.send(JSON.toJSONString(rpcRequest));
RpcResponse rpcResponse = JSON.parseObject(responseMsg.toString(), RpcResponse.class);
if (rpcResponse.getError() != null){
throw new RuntimeException(rpcResponse.getError());
}
//返回结果
Object result = rpcResponse.getResult();
System.out.println("result:"+result);
return JSON.parseObject(result.toString(),method.getReturnType());
}catch(Exception e){
throw e;
}finally {
rpcClient.close();
}
}
});
}
}
客户端启动类ClientBootStrap
package netty_rmi;
import java.rmi.RemoteException;
public class ClientBootStrap {
public static void main(String[] args) throws RemoteException {
IUserService userService= (IUserService) RpcClientProxy.createProxy(IUserService.class);
User user = userService.getById(1);
System.out.println(user.getId());
}
}