一、RPC 的基本介绍
RPC (Remote Procedure Call) 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互编程。也就是说可以达到两个或者多个应用程序部署在不同的服务器上,他们之间的调用都像是本地方法调用一样。RPC 的调用如下图。
常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 语言的rpcx,Apache的thrift,Spring的Spring Cloud.
若想了解dubbo与Spring Cloud的区别参考:SpringCloud 与 Dubbo 的区别,终于有人讲明白了...-腾讯云开发者社区-腾讯云
二、RPC 调用的过程
在RPC 中,Client 端叫做服务消费者,Server 叫做服务提供者。
调用流程说明
- 服务消费方(client)以本地调用方式调用服务
- client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
- client stub 将消息进行编码并发送到服务端
- server stub 接收到消息后进行解码
- server stub 根据解码结果调用本地的服务
- 本地服务执行并将结果返回给server stub
- server stub 将返回导入结果进行编码并发送给消费方
- client stub 接收到消息并进行解码
- 服务消费方(client) 得到结果
- 其中,RPC 框架的目标就是把2-8 这些步骤封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。
三、dubbo RPC
1.需求说明
dubbo 底层使用了Netty 作为网络通信框架,要求用netty 实现一个简单的RPC框架。
模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信给予Netty 4.x。
2.设计说明
创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。 开发的分析图如下:
3.代码实现
netty用的包:4.1.20.Final。pom.xml如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
1)公共接口
/**
* @author: fqtang
* @date: 2024/05/05/21:51
* @description: 服务提供方和服务消费方都需要
*/
public interface HelloService {
String say(String mes);
}
2)公共接口实现类
import org.springframework.util.StringUtils;
import com.tfq.netty.dubborpc.publicinterface.HelloService;
/**
* @author: fqtang
* @date: 2024/05/05/21:53
* @description: 描述
*/
public class HelloServiceImpl implements HelloService {
private static int count = 0;
/**
* 当有消费方调用该方法时就返回一个结果
*
* @param mes 传入消息
* @return 返回结果
*/
@Override
public String say(String mes) {
System.out.println("收到客户端消息=" + mes);
if(StringUtils.isEmpty(mes)) {
return "你好客户端,我已经收到你的消息 ";
}else{
return "你好客户端,我已经收到你的消息:【" + mes+"】,第 "+(++count)+"次。";
}
}
}
3)服务提供者
import com.tfq.netty.dubborpc.netty.NettyServer;
/**
* @author: fqtang
* @date: 2024/05/05/21:57
* @description: 启动服务提供者,就是NettyServer
*/
public class ServerBootstrap {
public static void main(String[] args) {
String hostName="127.0.0.1";
int port = 8001;
NettyServer.startServer(hostName,port);
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author: fqtang
* @date: 2024/05/05/21:59
* @description: 描述
*/
public class NettyServer {
public static void startServer(String hostName,int port){
startServer0(hostName,port);
}
/**
* 编写一个方法,完成对Netty Server的初始化工作和启动
* @param hostName
* @param port
*/
private static void startServer0(String hostName,int port){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(hostName,port).sync();
System.out.println("服务提供方开始提供服务~~~");
channelFuture.channel().closeFuture().sync();
}catch(Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.tfq.netty.dubborpc.consumer.ClientBootstrap;
import com.tfq.netty.dubborpc.provider.HelloServiceImpl;
/**
* @author: fqtang
* @date: 2024/05/05/22:03
* @description: 描述
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端调用的消息,并调用服务
System.out.println("msg = " + msg);
//客户端在调用服务器的时候,需要定义一个协议。比如我们要求每次发消息时,都必须以某个字符器开头
//比如:dubboserver#hello#xxxx
if(msg.toString().startsWith(ClientBootstrap.ProtocolHeader)) {
String res = new HelloServiceImpl().say(msg.toString()
.substring(msg.toString()
.lastIndexOf("#") + 1));
ctx.writeAndFlush(res);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4)消费者
import com.tfq.netty.dubborpc.netty.NettyClient;
import com.tfq.netty.dubborpc.publicinterface.HelloService;
/**
* @author: fqtang
* @date: 2024/05/05/23:26
* @description: 消费者
*/
public class ClientBootstrap {
/**
* 这里定义协议头
*/
public static final String ProtocolHeader = "dubboserver#say#";
public static void main(String[] args) throws InterruptedException {
//创建一个消费者
NettyClient customer = new NettyClient();
//创建代理对象
HelloService helloService = (HelloService) customer.getBean(HelloService.class, ProtocolHeader);
while(true) {
Thread.sleep(10 * 1000);
//通过代理对象调用提供者的方法(服务)
String res = helloService.say("你好 dubbo~");
System.out.println("调用的结果 res = " + res);
}
}
}
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author: fqtang
* @date: 2024/05/05/23:04
* @description: 描述
*/
public class NettyClient {
//创建一个线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(2);
private static NettyClientHandler clientHandler;
/**
* 编写方法使用代理模式,获取一个代理对象
* @param serviceClass
* @param protocolHeader
* @return
*/
public Object getBean(final Class<?> serviceClass, final String protocolHeader) {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(),
new Class<?>[]{serviceClass}, (proxy, method, args) -> {
if(clientHandler == null) {
initClient("127.0.0.1", 8001);
}
//设置要发送给服务器端的信息,protocolHeader为协议头[dubboserver#hello#],
//args[0] 就是客户端调用api say(???),参数
clientHandler.setParam(protocolHeader + args[0]);
return executorService.submit(clientHandler).get();
});
}
private static void initClient(String hostName, int port) {
EventLoopGroup worker = new NioEventLoopGroup();
try {
clientHandler = new NettyClientHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(new StringDecoder());
channelPipeline.addLast(new StringEncoder());
channelPipeline.addLast(clientHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect(hostName, port)
.sync();
/*channelFuture.channel()
.closeFuture()
.sync();*/
} catch(InterruptedException e) {
e.printStackTrace();
} /*finally {
worker.shutdownGracefully();
}*/
}
}
package com.tfq.netty.dubborpc.netty;
import java.util.concurrent.Callable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author: fqtang
* @date: 2024/05/05/22:48
* @description: 描述
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
/**
* 返回的结果
*/
private String result;
/**
* 客户端调用方法返回的参数
*/
private String param;
/**
* 与服务器的连接创建后,就会被调用,这个方法被第一个,调用(1)
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//因为在其他方法会使用到这个ctx
context = ctx;
System.out.println("调用(1) channelActive--->连接到服务器");
}
/**
* 被调用(4)
* 收到服务器的数据后,调用方法
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = (String) msg;
System.out.println("调用(4)channelRead--->从服务器读取到数据:"+result);
//唤醒等待的线程
notify();
System.out.println("调用(4)channelRead---notify()---->从服务器读取到数据后唤醒线程.....");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/**
* 被调用(3), 被调用(5)
* 被代理对象调用,发送数据给服务器,--->wait ---> 等待被唤醒 --->返回结果
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(param);
System.out.println("调用(3) call()--->被代理对象调用,发送数据给服务器.....");
//进行wait,等待channelRead 方法获取到服务器的结果后,唤醒
wait();
System.out.println("调用(5) call()--->wait() 等待channelRead 方法获取到服务器的结果后.....");
return result;
}
/**
* 被调用(2)
* @param param
*/
void setParam(String param){
System.out.println("调用(2) setParam()--->被代理对象调用,发送数据给服务器.....");
this.param = param;
}
}
若有问题请留言。