Netty 实现dubbo rpc

news2025/1/4 9:29:58

一、RPC 的基本介绍

  RPC (Remote Procedure Call) 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互编程。也就是说可以达到两个或者多个应用程序部署在不同的服务器上,他们之间的调用都像是本地方法调用一样。RPC 的调用如下图。

常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 语言的rpcx,Apache的thrift,Spring的Spring Cloud.

若想了解dubbo与Spring Cloud的区别参考:SpringCloud 与 Dubbo 的区别,终于有人讲明白了...-腾讯云开发者社区-腾讯云

二、RPC 调用的过程

在RPC 中,Client 端叫做服务消费者,Server 叫做服务提供者。

调用流程说明

  • 服务消费方(client)以本地调用方式调用服务
  • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  • client stub 将消息进行编码并发送到服务端
  • server stub 接收到消息后进行解码
  • server stub 根据解码结果调用本地的服务
  • 本地服务执行并将结果返回给server stub
  • server stub 将返回导入结果进行编码并发送给消费方
  • client stub 接收到消息并进行解码
  • 服务消费方(client) 得到结果
  • 其中,RPC 框架的目标就是把2-8 这些步骤封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

三、dubbo RPC

1.需求说明

dubbo 底层使用了Netty 作为网络通信框架,要求用netty 实现一个简单的RPC框架。

模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信给予Netty 4.x。

2.设计说明

创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。

创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。 开发的分析图如下:

3.代码实现

netty用的包:4.1.20.Final。pom.xml如下:

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.20.Final</version>
</dependency>

1)公共接口

/**
 * @author: fqtang
 * @date: 2024/05/05/21:51
 * @description: 服务提供方和服务消费方都需要
 */
public interface HelloService {

	String say(String mes);
}

2)公共接口实现类

import org.springframework.util.StringUtils;
import com.tfq.netty.dubborpc.publicinterface.HelloService;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:53
 * @description: 描述
 */
public class HelloServiceImpl implements HelloService {

	private static int count = 0;

	/**
	 * 当有消费方调用该方法时就返回一个结果
	 *
	 * @param mes 传入消息
	 * @return 返回结果
	 */
	@Override
	public String say(String mes) {
		System.out.println("收到客户端消息=" + mes);
		if(StringUtils.isEmpty(mes)) {
			return "你好客户端,我已经收到你的消息 ";
		}else{
			return "你好客户端,我已经收到你的消息:【" + mes+"】,第 "+(++count)+"次。";
		}
	}
}

3)服务提供者

import com.tfq.netty.dubborpc.netty.NettyServer;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:57
 * @description: 启动服务提供者,就是NettyServer
 */
public class ServerBootstrap {

	public static void main(String[] args) {

		String hostName="127.0.0.1";
		int port = 8001;
		NettyServer.startServer(hostName,port);
	}

}



import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:59
 * @description: 描述
 */
public class NettyServer {

	public static void startServer(String hostName,int port){
		startServer0(hostName,port);
	}

	/**
	 * 编写一个方法,完成对Netty Server的初始化工作和启动
	 * @param hostName
	 * @param port
	 */
	private static void startServer0(String hostName,int port){
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try{
			ServerBootstrap serverBootstrap = new ServerBootstrap();

			serverBootstrap.group(bossGroup,workerGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new StringDecoder());
						pipeline.addLast(new StringEncoder());
						pipeline.addLast(new NettyServerHandler());
					}
				});

			ChannelFuture channelFuture = serverBootstrap.bind(hostName,port).sync();
			System.out.println("服务提供方开始提供服务~~~");
			channelFuture.channel().closeFuture().sync();
		}catch(Exception e){
			e.printStackTrace();
		}finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}

	}
}



import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.tfq.netty.dubborpc.consumer.ClientBootstrap;
import com.tfq.netty.dubborpc.provider.HelloServiceImpl;

/**
 * @author: fqtang
 * @date: 2024/05/05/22:03
 * @description: 描述
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//获取客户端调用的消息,并调用服务
		System.out.println("msg = " + msg);
		//客户端在调用服务器的时候,需要定义一个协议。比如我们要求每次发消息时,都必须以某个字符器开头
		//比如:dubboserver#hello#xxxx
		if(msg.toString().startsWith(ClientBootstrap.ProtocolHeader)) {
			String res = new HelloServiceImpl().say(msg.toString()
				.substring(msg.toString()
					.lastIndexOf("#") + 1));
			ctx.writeAndFlush(res);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
}


4)消费者

import com.tfq.netty.dubborpc.netty.NettyClient;
import com.tfq.netty.dubborpc.publicinterface.HelloService;

/**
 * @author: fqtang
 * @date: 2024/05/05/23:26
 * @description: 消费者
 */
public class ClientBootstrap {

	/**
	 * 这里定义协议头
	 */
	public static final String ProtocolHeader = "dubboserver#say#";

	public static void main(String[] args) throws InterruptedException {
		//创建一个消费者
		NettyClient customer = new NettyClient();
		//创建代理对象
		HelloService helloService = (HelloService) customer.getBean(HelloService.class, ProtocolHeader);
		while(true) {
			Thread.sleep(10 * 1000);
			//通过代理对象调用提供者的方法(服务)
			String res = helloService.say("你好 dubbo~");
			System.out.println("调用的结果 res = " + res);
		}
	}
}



import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: fqtang
 * @date: 2024/05/05/23:04
 * @description: 描述
 */
public class NettyClient {

	//创建一个线程池
	private static ExecutorService executorService = Executors.newFixedThreadPool(2);

	private static NettyClientHandler clientHandler;

	/**
	 * 编写方法使用代理模式,获取一个代理对象
	 * @param serviceClass
	 * @param protocolHeader
	 * @return
	 */
	public Object getBean(final Class<?> serviceClass, final String protocolHeader) {

		return Proxy.newProxyInstance(Thread.currentThread()
				.getContextClassLoader(),
			new Class<?>[]{serviceClass}, (proxy, method, args) -> {
				if(clientHandler == null) {
					initClient("127.0.0.1", 8001);
				}
				//设置要发送给服务器端的信息,protocolHeader为协议头[dubboserver#hello#],
				//args[0] 就是客户端调用api say(???),参数
				clientHandler.setParam(protocolHeader + args[0]);
				return executorService.submit(clientHandler).get();
			});
	}

	private static void initClient(String hostName, int port) {
		EventLoopGroup worker = new NioEventLoopGroup();
		try {
			clientHandler = new NettyClientHandler();
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(worker)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline channelPipeline = ch.pipeline();
						channelPipeline.addLast(new StringDecoder());
						channelPipeline.addLast(new StringEncoder());
						channelPipeline.addLast(clientHandler);
					}
				});

			ChannelFuture channelFuture = bootstrap.connect(hostName, port)
				.sync();
			/*channelFuture.channel()
				.closeFuture()
				.sync();*/
		} catch(InterruptedException e) {
			e.printStackTrace();
		} /*finally {
			worker.shutdownGracefully();
		}*/
	}
}



package com.tfq.netty.dubborpc.netty;

import java.util.concurrent.Callable;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: fqtang
 * @date: 2024/05/05/22:48
 * @description: 描述
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

	private ChannelHandlerContext context;
	/**
	 * 返回的结果
	 */
	private String result;
	/**
	 * 客户端调用方法返回的参数
	 */
	private String param;

	/**
	 * 与服务器的连接创建后,就会被调用,这个方法被第一个,调用(1)
	 * @param ctx
	 * @throws Exception
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		//因为在其他方法会使用到这个ctx
		context = ctx;
		System.out.println("调用(1) channelActive--->连接到服务器");
	}

	/**
	 *  被调用(4)
	 * 收到服务器的数据后,调用方法
	 * @param ctx
	 * @param msg
	 * @throws Exception
	 */
	@Override
	public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		result = (String) msg;
		System.out.println("调用(4)channelRead--->从服务器读取到数据:"+result);
		//唤醒等待的线程
		notify();
		System.out.println("调用(4)channelRead---notify()---->从服务器读取到数据后唤醒线程.....");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}

	/**
	 * 被调用(3), 被调用(5)
	 * 被代理对象调用,发送数据给服务器,--->wait ---> 等待被唤醒 --->返回结果
	 * @return
	 * @throws Exception
	 */
	@Override
	public synchronized Object call() throws Exception {
		context.writeAndFlush(param);
		System.out.println("调用(3) call()--->被代理对象调用,发送数据给服务器.....");
		//进行wait,等待channelRead 方法获取到服务器的结果后,唤醒
		wait();
		System.out.println("调用(5) call()--->wait() 等待channelRead 方法获取到服务器的结果后.....");
		return result;
	}

	/**
	 * 被调用(2)
	 * @param param
	 */
	void setParam(String param){
		System.out.println("调用(2) setParam()--->被代理对象调用,发送数据给服务器.....");
		this.param = param;
	}
}

若有问题请留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1647174.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于BP神经网络的QPSK解调算法matlab性能仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 ........................................................................ for ij 1:leng…

神经网络中的归一化

我们今天介绍一下神经网络中的归一化方法~ 之前学到的机器学习中的归一化是将数据缩放到特定范围内&#xff0c;以消除不同特征之间的量纲和取值范围差异。通过将原始数据缩放到一个特定的范围内&#xff0c;比如[0,1]或者[-1,1]&#xff0c;来消除不同特征之间的量纲和取值范围…

弹性云服务器给用户带来了哪些便利

什么是弹性云服务器&#xff1f; 弹性云服务器&#xff08;ECS&#xff0c;Elastic Cloud Server&#xff09;简单地说&#xff0c;是指运行在云计算环境中的虚拟服务器。弹性云服务器可以说是虚拟专用服务器(VPS)&#xff0c;但VPS却不能说是云服务器。这是因为两者有着本质的…

软件游戏丢失XINPUT1_4.dll文件的多种解决方法分享

当玩家在尝试启动某款游戏时&#xff0c;遇到了系统提示“游戏找不到XINPUT1_4.dll”&#xff0c;这个错误通常发生在玩家尝试启动游戏时&#xff0c;游戏无法找到所需的XINPUT1_4.dll文件&#xff0c;呆滞无法正常启动运行。但是幸运的是&#xff0c;有一些简单的修复方法可以…

【typescript测试 - Jest 配置与使用】

安装 npm install --save-dev types/jestnpm install --save-dev ts-jest配置 tsconfig.json {"compilerOptions": {"types": ["jest"]} }jest.config.js module.exports {preset: ts-jest,testEnvironment: node, };使用 // add.js funct…

数据分析——业务指标量化

业务指标量化 前言一、统计指标二、统计指标特点完整的统计指标统计指标的理解和使用方法 三、统计指标类型总量指标时期指标时点指标总量指标的作用 相对指标计划完成相对数指标结构相对数指标比例相对数指标比较相对数指标动态相对数指标 平均指标 四、数量指标和质量指标五、…

【1小时掌握速通深度学习面试8】生成模型-中

目录 28.DBN与DBM 有什么区别? 29.VAE如何控制生成图像的类别? 30.如何修改VAE的损失函数&#xff0c;使得隐藏层的编码是相互解耦的? 31.自回归方法如何应用在生成模型上? 32.原始 VAE存在哪些问题? 有哪些改进方式? 33.如何将VAE与GAN 进行结合&#xff1f; 34.…

Rust Postgres实例

Rust Postgres介绍 Rust Postgres是一个纯Rust实现的PostgreSQL客户端库&#xff0c;无需依赖任何外部二进制文件2。这意味着它可以轻松集成到你的Rust项目中&#xff0c;提供对PostgreSQL的支持。 特点 高性能&#xff1a;Rust Postgres提供了高性能的数据库交互功能&#…

C++使用单链表实现一元多项式的加,乘操作

相邀再次喝酒 待 葡萄成熟透 但是命运入面 每个邂逅 一起走到了 某个路口 是敌与是友 各自也没有自由 位置变了 各有队友 首先&#xff0c;按照惯例&#xff0c;十分欢迎大家边听歌边观看本博客&#xff01;&#xff01; 最佳损友 - 陈奕迅 - 单曲 - 网易云音乐 (163.com) 一…

ABAP 第二代增强-采购申请子屏幕增强

文章目录 第二代增强-采购申请子屏幕增强需求实现过程创建项目运行效果客户屏幕的PBO全局变量获取数据更新数据运行效果查询底表修改数据 第二代增强-采购申请子屏幕增强 需求 实现过程 创建项目 运行效果 客户屏幕的PBO 全局变量 *&------------------------------------…

python_5

# 制作一个注册登录模块 # 注册&#xff1a;将用户填入的账户和密码保存到一个文件(users.bin) # 登陆&#xff1a;将用户填入账户密码和users.bin中保存的账户密码进行比对,如果账户和密码完全相同 那么登录成功&#xff0c;否则登录失败 import hashlib import json import o…

Petalinux的使用——定制Linux系统

文章目录 配置petalinux运行环境petalinux设计流程 配置petalinux运行环境 Petalinux的安装在文章Ubuntu镜像源的更改及其Petalinux的安装中已经介绍&#xff0c;下面介绍petalinux运行环境的配置过程。 进入到petalinux的安装路径下&#xff0c;使用下面的命令对petalinux的运…

第十三章 计算机网络

这里写目录标题 1.网络设备2.协议簇2.1电子邮件(传输层)2.2地址解析(网际层)2.3DHCP(动态主动配置协议)2.4URL(统一资源定位器)2.5IP地址和子网掩码 1.网络设备 物理层&#xff1a;中继器&#xff0c;集线器(多路中继器) 数据链路层&#xff1a;网桥&#xff0c;交换机(多端口…

【微磁学3D绘图工具探索】Excalibur

文章目录 概要调查报告技术名词解释主要特点 技术和算法实现他能够画出怎样酷炫的图 小结 概要 微磁学中的磁学结构同时包括二维和三维&#xff0c;想要绘制得好看&#xff0c;结果清晰&#xff0c;那么就需要一些自己写的绘图代码之外的额外渲染功能&#xff0c;尤其是对于三…

JS hook cookie

JS hook cookie cookie 的值是V&#xff0c;v是动态变化的 可以看到D中生成了cookie的值n 尝试使用RPC定位到cookie。 替换内容&#xff0c;下断点。 将写好的RPC代码直接插入 加入代码&#xff0c;file.virjar.com/sekiro_web_client.js?_123 这个地址是在前端创建客户端…

python使用mongo操作

目前有个需求&#xff0c;就是把所有sql转为mongo管道查询 知识点 在 MongoDB 中&#xff0c;allowDiskUse 选项应该作为聚合命令的一个选项&#xff0c;而不是聚合管道的一个阶段。allowDiskUse 选项用于允许聚合操作使用磁盘空间来临时存储数据&#xff08;当聚合操作的数据…

Amazon Bedrock的进化:更多选择与新特性,助力生成式AI应用更快落地

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

YoloV9改进策略:Block改进|改进HCF-Net的PPA模块|附结构图|(独家改进,全网首发)

摘要 HCF-Net是一种用于红外小物体检测的深度学习网络。它主要包括三个模块:并行化斑块感知注意力(PPA)模块、维度感知选择性整合(DASI)模块和多稀释通道细化器(MDCR)模块。 PPA模块采用多分支特征提取策略,用于捕捉不同尺度和层次的特征信息。DASI模块可实现自适应信…

安卓使用Fiddler抓包 2024

简介 最近试了一下安卓使用fiddler 抓包&#xff0c;发现https包基本都会丢失。原因是Anandroid 7版本针对ssl安全性做了加强&#xff0c;不认可用户的证书。我们要做的就是把fiddler导出的证书进过处理后放置到系统证书目录下面&#xff0c;这样才能抓包https请求。 这里使用…

323_C++_QT_QProcess执行cmd解压、压缩、删除tar.gz等等其他压缩包文件到指定目录,不需要外部库,QT自带API的就行

// decompressPath : 解压到此目录 // fileName : 解压的tar.gz文件名executeCommand(decompressPath , QString::fromStdString(fileName));// 开始解压 void executeCommand