Java基础之《netty(31)—用netty实现RPC》

news2025/1/16 1:00:50

一、需求说明

1、dubbo底层使用了netty作为网络通讯框架,要求使用netty实现一个简单的RPC框架。

2、模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用netty4.x。

二、设计说明

1、创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。

3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。

三、代码

1、接口
HelloService.java

package netty.dubborpc.publicinterface;

/**
 * 接口,服务提供方和服务消费方都需要
 * @author user
 *
 */
public interface HelloService {

	public String hello(String msg);
}

2、netty服务端
NettyServer.java

package netty.dubborpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class NettyServer {

	public static void startServer(String hostname, int port) {
		startServer0(hostname, port);
	}
	
	private static void startServer0(String hostname, int port) {
		
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup(8);
		
		try {
			ServerBootstrap bootstrap = new ServerBootstrap();
			bootstrap.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.handler(new LoggingHandler(LogLevel.DEBUG))
				.childHandler(new ChannelInitializer<SocketChannel>() {

					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new StringDecoder());
						pipeline.addLast(new StringEncoder());
						pipeline.addLast(new NettyServerHandler()); //业务处理类
						
					}
					
				}); //自定义一个初始化类
			
			ChannelFuture cf = bootstrap.bind(7000).sync();
			System.out.println("服务提供方启动,开始监听了......");
			cf.channel().closeFuture().sync();
			
		} catch (Exception e) {
			e.printStackTrace();
			
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}

NettyServerHandler.java

package netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import netty.dubborpc.provider.HelloServiceImpl;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.channel().close();
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//获取客户端发送的消息,并调用服务
		System.out.println("msg=" + msg);
		
		//客户端在调用服务器的api时,我们需要定义一个协议
		//比如我们要求,每次发消息时,都必须以某个字符串开头 "HelloService#hello#"
		if (msg.toString().startsWith("HelloService#hello#")) {
			//去除协议头
			//这里可以用反射生成处理类
			//还要考虑粘包拆包问题
			String result = new HelloServiceImpl().hello(msg.toString().substring(19));
			ctx.writeAndFlush(result);
		}
		
	}
}

3、netty客户端
NettyClient.java

package netty.dubborpc.netty;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class NettyClient {

	//创建线程池
	private static ExecutorService executor = Executors.newFixedThreadPool(10);
	
	private static NettyClientHandler client;
	
	//编写方法,使用代理模式获取代理对象
	public Object getBean(final Class<?> serviceClass, final String protocolHeader) {
		
		return Proxy.newProxyInstance(
				Thread.currentThread().getContextClassLoader(), 
				new Class<?>[] {serviceClass}, 
				//lambel表达式就是实现的接口InvocationHangler的invoke方法
				(proxy, method, args) -> {
					
					//这部分代码,客户端每调用一次hello,就会进入该代码块
					
					if (client == null) {
						initClient();
						initConnect();
					}
					
					//设置要发给服务器的信息
					//protocolHeader协议头,args[0]就是客户端调用api hello()里传的参数
					client.setParam(protocolHeader + args[0]);
					
					return executor.submit(client).get();
				});
	}
	
	//初始化客户端
	private static void initClient() {
		client = new NettyClientHandler();
	}
	
	//初始化连接
	private static void initConnect() {
		EventLoopGroup group = new NioEventLoopGroup();
		
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(group) //设置线程组
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new LoggingHandler(LogLevel.DEBUG))
				.handler(new ChannelInitializer<SocketChannel>() {

					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new StringDecoder());
						pipeline.addLast(new StringEncoder());
						pipeline.addLast(client);
					}
				}); //自定义一个初始化对象
			
			ChannelFuture cf = bootstrap.connect("127.0.0.1", 7000).sync();
			
			//这里不能阻塞必须返回,因为后续代理还要调用call方法,所以不能closeFuture sync,但可以对closeFuture加一个listener回调
			//cf.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			//这里也不能shutdown
			//group.shutdownGracefully();
		}
	}
}

NettyClientHandler.java

package netty.dubborpc.netty;

import java.util.concurrent.Callable;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable<Object> {

	//定义属性
	//感觉这三个变量都有并发问题???
	private ChannelHandlerContext context; //上下文
	private String result; //调用后返回的结果
	private String param; //客户端调用方法时,传入的参数
	
	/**
	 * Callable接口有一个非常重要的方法call()
	 * 被代理对象调用,发送数据给服务器 -> wait -> 等待被唤醒 -> 返回结果
	 * 一句话,就是客户端要自己控制什么时候发消息,channelActive不行,而且还要等待结果返回所以要wait,等channelRead返回后call再返回
	 */
	@Override
	public synchronized Object call() throws Exception {
		System.out.println("call 被调用 before");
		context.writeAndFlush(param); //把参数发过去
		//进行wait
		wait(); //等待channelRead方法获取到服务器的结果后,唤醒
		System.out.println("call 被调用 after");
		return result; //服务方返回的结果
	}

	/**
	 * 与服务器的连接创建成功后,就会被调用
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("channelActive 被调用");
		if (context == null) {
			context = ctx; //因为在其他方法会使用到这个ctx
		}
		
	}
	
	/**
	 * 收到服务器的数据后,就会被调用
	 */
	@Override
	public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println("channelRead 被调用");
		result = msg.toString();
		notify(); //唤醒等待的线程
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.channel().close();
	}
	
	public void setParam(String param) {
		System.out.println("setParam 被调用");
		this.param = param;
	}
	
}

4、服务提供者
ServerBootstrap.java

package netty.dubborpc.provider;

import netty.dubborpc.netty.NettyServer;

/**
 * ServerBootstrap会启动一个服务提供者,就是NettyServer
 * @author user
 *
 */
public class ServerBootstrap {
	public static void main(String[] args) {
		NettyServer.startServer("127.0.0.1", 7000);
	}
}

HelloServiceImpl.java

package netty.dubborpc.provider;

import netty.dubborpc.publicinterface.HelloService;

public class HelloServiceImpl implements HelloService {
	
	private int count = 0;

	@Override
	public String hello(String msg) {
		System.out.println("收到客户端消息:" + msg);
		//根据msg返回不同的结果
		if (msg != null) {
			return "你好客户端,我已经收到你的消息 [" + msg + "] 第" + (++count) + "次";
		} else {
			return "你好客户端,消息为空";
		}
	}

}

5、服务消费者
ClientBootstrap.java

package netty.dubborpc.consumer;

import netty.dubborpc.netty.NettyClient;
import netty.dubborpc.publicinterface.HelloService;

public class ClientBootstrap {
	
	//这里定义协议头
	public static final String providerName = "HelloService#hello#";
	
	public static void main(String[] args) {
		//创建一个消费者
		NettyClient consumer = new NettyClient();
		
		for (int i=0; i<10; i++) {
			//创建代理对象
			HelloService helloService = (HelloService)consumer.getBean(HelloService.class, providerName);
			
			//通过代理对象调用服务提供者的方法
			String result = helloService.hello("你好 RPC~");
			System.out.println("调用结果 result:" + result);
			System.out.println("-------------------------");
		}
		
	}
}

四、执行结果

1、服务端

服务提供方启动,开始监听了......
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~
msg=HelloService#hello#你好 RPC~
收到客户端消息:你好 RPC~

2、客户端

channelActive 被调用
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------
setParam 被调用
call 被调用 before
channelRead 被调用
call 被调用 after
调用结果 result:你好客户端,我已经收到你的消息 [你好 RPC~] 第1次
-------------------------

五、说明

1、netty客户端和服务端之间是长连接,如果要短连接用socket连服务端就好了,netty客户端做短连接不合适,EventLoopGroup每次创建销毁开销太大了。
2、NettyClientHandler类内部的调用顺序:第一个调用的channelActive(首次时),第二个调用的setParam,第三个调用的call,然后wait,第四个调用channelRead,然后notify,唤醒线程继续在call里执行
3、NettyClientHandler类是所有请求共用的,它的成员变量有并发问题
4、客户端每调用一次,服务端都产生新的HelloServiceImpl对象(这个是自己new出来的)
5、客户端的channelActive只会在连接成功后被调用一次
6、NettyClient类里其实不需要线程池,因为客户端就只有一个,顶多newSingleThreadExecutor。如果真的起多个客户端notify唤醒谁?
7、用户线程想拿到结果需要等待react线程唤醒。context.write方法是在用户线程调用,netty会把这个读封装成task任务丢到react线程。最终发送到服务端,服务端返回结果,客户端在react线程读取
8、客户端NettyClient和NettyClientHandler只有一个实例,是自己new出来的

六、问题

这里demo的客户端有并发和性能问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/176475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

S32G274A spi发送数组值不变问题

官方例程的spi问题 spi发送函数 下面是S32G两个spi从机发送函数 /** * brief SPI/DSPI异步传输。 * 这个函数使用提供的总线参数初始化异步传输 , 通过外部设备。 * param[in] ExternalDevice -指向传输数据的外部设备的指针 * param[in] TxBuffer -发送缓冲区的指针。 * pa…

3.1动态规划--矩阵连乘问题

写在前面&#xff1a;矩阵连乘的要点 1、最优解数组的含义--A[1:n]的最少数乘次数 2、数组的填写方向--斜着填 3、递推方程含义 今天开始动态规划的学习&#xff0c;动态规划与分治法类似&#xff0c;基本思想就是将待求解的问题分成若干子问题&#xff0c;先求解子问题&am…

Java 23种设计模式(2.创建者模式-工厂设计模式)

代码分析 通过代码的不同实现方式&#xff0c;了解工厂模式 代码分析之后有具体的讲解 1.业务和逻辑分开实现 public class Operation {public static double GetResult(double numberA,double numberB,String operate){double result 0;switch (operate){case "":r…

SpringBoot+Vue项目月度员工绩效考核管理系统

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏…

Kubernetes:通过 kubectl 插件 kubectl-tree 查看API对象层级关系

写在前面 分享一个小工具 kubectl-tree&#xff0c;用于查看 k8s API 对象层级关系比如对于有状态应用来讲&#xff0c;可以看到Deployment --> ReplicaSet --> Pod 的构成关系博文内容涉及&#xff1a;tree 插件的安装以及使用。理解不足小伙伴帮忙指正 岂其食鱼&#x…

Java---微服务---Nacos安装

Nacos安装1.Windows安装1.1.下载安装包1.2.解压1.3.端口配置1.4.启动1.5.访问2.Linux安装2.1.安装JDK2.2.上传Nacos安装包2.3.解压2.4.端口配置2.5.启动2.6.访问3.Nacos的依赖1.Windows安装 开发阶段采用单机安装即可。 1.1.下载安装包 在Nacos的GitHub页面&#xff0c;提供…

Java/JavaScript有哪些图形图像处理的框架?

文章目录一个小问题引发的学习热潮其它几个图形库Eclipse GEF框架Java图像库JS 的图形框架图形处理库图像编辑物理引擎流程图/组织图/图编辑等全景图/AR/VR3D库Javascript游戏编程库尾声一个小问题引发的学习热潮 一直对Java图形图像编程念兹在兹&#xff0c;书架上有几本相关…

2、IDEA的卸载与安装

文章目录2、IDEA的卸载与安装2.1 卸载过程2.2 安装前的准备2.3 安装过程2.4注册过程方式一&#xff1a;免费试用30天方式二&#xff1a;官网购买方式三&#xff1a;教育使用2.5 闪退问题【尚硅谷】idea实战教程-讲师&#xff1a;宋红康 生活是属于每个人自己的感受&#xff0c;…

3.1 卷积神经网络的应用领域|卷积的作用|卷积特征值的计算方法|得到特征图表示|步长与卷积核大小对结果的影响|边缘填充方法

文章目录卷积神经网络的应用领域卷积的作用卷积特征值的计算方法得到特征图表示步长与卷积核大小对结果的影响边缘填充方法卷积神经网络的应用领域 检测任务分类与检索超分辨率重构医学任务无人驾驶NVIDIA Tegra X1&#xff08;显卡 GPU&#xff09; 卷积的作用 卷积神经网络…

产品设计-基础控件-信息输出控件

产品设计-基础控件-信息输出控件1.1 走马灯1.1.1 图片轮播样式1.1.2 文字轮播样式1.2 折叠面板1.3 时间轴与步骤条1.3.1 时间轴1.3.2 步骤条1.4标签和徽标1.4.1 标签和徽标1.4.2 徽标1.5 面包屑与查询器1.5.1 面包屑1.5.2 查询器1.6 列表页与详情页1.6.1 列表页1.6.2 详情页1.7…

【实操案例十】函数操作 实例代码及运行效果图!

任务一&#xff1a;Mini计算器 # 任务一&#xff1a;Mini计算器 def calc(a, b, op):if op :return add(a, b)elif op -:return sub(a, b)elif op *:return mul(a, b)elif op /:if b ! 0:return div(a, b)else:return 0不能为除数&#xff01;def add(a, b):return a bde…

这种银行病毒是2022年12月的头号恶意软件

到 2022 年&#xff0c;全球网络攻击同比增长 38%&#xff0c;并且是由更小、更灵活的黑客和勒索软件团伙驱动的。 根据一份报告&#xff0c;全球网络攻击数量在第四季度达到历史新高&#xff0c;平均每个组织每周发生 1,168 次攻击。 现在&#xff0c;一份新报告列出了上个月…

【iMessage苹果推】iOS 当地推送(Local Push) 安装OS CSR文件尽可能多地使每个证书区分开

推荐内容IMESSGAE相关 作者✈️IMEAX推荐内容iMessage苹果推软件 *** 点击即可查看作者要求内容信息作者✈️IMEAX推荐内容1.家庭推内容 *** 点击即可查看作者要求内容信息作者✈️IMEAX推荐内容2.相册推 *** 点击即可查看作者要求内容信息作者✈️IMEAX推荐内容3.日历推 *** …

长短期记忆(LSTM)详解

入门小菜鸟&#xff0c;希望像做笔记记录自己学的东西&#xff0c;也希望能帮助到同样入门的人&#xff0c;更希望大佬们帮忙纠错啦~侵权立删。 ✨完整代码在我的github上&#xff0c;有需要的朋友可以康康✨ ​​​​​​https://github.com/tt-s-t/Deep-Learning.git 目录 一…

【C语言进阶】指针进阶(干货)

目录 一、字符指针 二、指针数组 三、数组指针 1、数组指针的定义 2、&数组名和数组名的区别 3、数组指针的使用 四、数组传参和指针传参 1、一维数组传参 2、一级指针传参 3、二维数组传参 4、二级指针传参 五、函数指针 1、函数指针的定义 2、函数指针的使用 六、…

论文阅读:《Collision Avoidance Testing of the Waymo Automated Driving System》

文章目录1 背景2 方法2.1 Overview2.2 安全测试目标2.2.1 测试目标设定方法&#xff08;Method to Set the Test Objective&#xff09;2.2.2 测试目标度量方法&#xff08;Metrics to Measure the Test Objectives&#xff09;2.3 基于潜在危机情况的测试场景&#xff08;Test…

17种编程语言实现排序算法-希尔排序

开源地址 https://gitee.com/lblbc/simple-works/tree/master/sort/ 覆盖语言&#xff1a;C、C、C#、Java、Kotlin、Dart、Go、JavaScript(JS)、TypeScript(TS)、ArkTS、swift、PHP。 覆盖平台&#xff1a;安卓(Java、Kotlin)、iOS(SwiftUI)、Flutter(Dart)、Window桌面(C#)、…

Unity HurricaneVR 插件中的 VRIK 设置

IK&#xff08;反向动力学&#xff09;有利于提升 VR 应用中的沉浸感&#xff0c;比如我们可以通过对手部的追踪&#xff0c;再结合 IK&#xff0c;来模拟 VR 中人物的手臂和手肘的姿态。 Final IK 是 Unity 一款功能强大的 IK 插件&#xff0c;其中拥有适用于 VR 的 IK 功能&…

基于微信小程序的自驾游拼团小程序

文末联系获取源码 开发语言&#xff1a;Java 框架&#xff1a;ssm JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏览器…

逻辑陷阱型蜜罐合约

蜜罐是传统安全领域中的一个概念&#xff0c;通常指安全人员设置一些陷阱&#xff08;比较明显的漏洞&#xff09;&#xff0c;让攻击者自己掉入我们设置好的陷阱中&#xff0c;以便安全人员分析攻击者的作恶手法。蜜罐合约&#xff08;HoneyPots Contract&#xff09;也是类似…