Golang RPC实现-day02

news2024/12/25 23:16:43

导航

  • Golang RPC实现
    • 一、客户端异步并发多个请求
      • 1、 客户端结构体
      • 2、 一个客户端,异步发送多个请求,使用`call`结构体代表客户端的每次请求
      • 3、客户端并发多个请求
      • 4、客户端接收请求

Golang RPC实现

  • day01 我们实现了简单的服务端客户端
  • 我们简单总结一下day01的模式。
  • 服务端按顺序处理客户端过来的请求,按顺序响应客户端的请求。
  • 客户端同步的方式发送请求,不能并发发出请求。
  • 那么我们day02干的事情就是,让客户端异步并发的发出请求(请求顺序变得随机),服务端依然是按请求顺序进行处理,处理完某一个请求就返回,可以不按请求的顺序响应数据,但是响应数据是要上锁的,否则会发生响应数据并发安全问题。
  • 主要逻辑是修改了客户端的代码,服务端和day01没有变化

一、客户端异步并发多个请求

1、 客户端结构体

type Client struct {
	cc       codec.Codec//编码方式
	opt      *Option//发出请求的第一个包,用来协商后续包的格式和编码方式
	sending  sync.Mutex // 当一个请求正在发送时,不可以转头去执行别的请求
	header   codec.Header // 请求头内容
	mu       sync.Mutex // protect following
	seq      uint64 //记录该客户端一次请求连接的序号,
	pending  map[uint64]*Call//通过seq快速找到客户端的某个请求
	closing  bool // user has called Close
	shutdown bool // server has told us to stop
}

2、 一个客户端,异步发送多个请求,使用call结构体代表客户端的每次请求

type Call struct {
	Seq           uint64	//当前请求的序号,唯一标识一个请求
	ServiceMethod string      // format "<service>.<method>" 此次请求的服务和方法
	Args          interface{} // arguments to the function 请求函数的参数
	Reply         interface{} // reply from the function 服务端函数的响应数据
	Error         error       // if error occurs, it will be set //发生错误时的信息
	Done          chan *Call  // Strobes when call is complete.完成一次请求通过chan来通知
}

3、客户端并发多个请求

  • 主函数逻辑
func main() {
	log.SetFlags(0)
	addr := make(chan string)
	go startServer(addr)
	client, _ := geerpc.Dial("tcp", <-addr)
	defer func() { _ = client.Close() }()

	time.Sleep(time.Second)
	// send request & receive response
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {//go 实现异步非阻塞发送多个请求
			defer wg.Done()
			args := fmt.Sprintf("geerpc req %d", i)//一次请求携带的数据
			var reply string
			if err := client.Call("Foo.Sum", args, &reply); err != nil {//call发出一次请求,&reply,传的是引用,如果有响应,就能接收到
				log.Fatal("call Foo.Sum error:", err)
			}
			log.Println("reply:", reply)
		}(i)
	}
	wg.Wait()
}
  • Call 准备发出一次请求
// Call invokes the named function, waits for it to complete,
// and returns its error status.
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done//阻塞等待此次请求的channel,直到服务端处理并响应才返回
	return call.Error
}
  • 绑定数据到请求中
// Go invokes the function asynchronously.
// It returns the Call structure representing the invocation.
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
	if done == nil {
		done = make(chan *Call, 10)
	} else if cap(done) == 0 {
		log.Panic("rpc client: done channel is unbuffered")
	}
	call := &Call{
		ServiceMethod: serviceMethod,//此次请求的服务和方法
		Args:          args,//此次请求的参数
		Reply:         reply,//此处是引用类型,暂时还没有数据,等服务端响应就有数据了
		Done:          done,//绑定此次请求的响应channel,服务端响应后就往对应的channel发一条数据
	}
	client.send(call)
	return call
}
  • 发送请求到服务端
func (client *Client) send(call *Call) {
	// make sure that the client will send a complete request
	client.sending.Lock()
	defer client.sending.Unlock()

	// register this call.
	seq, err := client.registerCall(call)//注册这次call,把这次的请求ID注册到客户端中。。。
	if err != nil {
		call.Error = err
		call.done()
		return
	}

	// prepare request header
	client.header.ServiceMethod = call.ServiceMethod
	client.header.Seq = seq
	client.header.Error = ""

	// encode and send the request
	if err := client.cc.Write(&client.header, call.Args); err != nil {//发送请求头和请求参数
		call := client.removeCall(seq)
		// call may be nil, it usually means that Write partially failed,
		// client has received the response and handled
		if call != nil {
			call.Error = err
			call.done()
		}
	}
}

4、客户端接收请求

func (client *Client) receive() {
	var err error
	for err == nil {
		var h codec.Header
		if err = client.cc.ReadHeader(&h); err != nil { 接收请求是一个个来,当连接关闭时,此处会报错,退出整个客户端
			break
		}
		call := client.removeCall(h.Seq)//通过Seq唯一标识符删除一个请求
		switch {
		case call == nil:
			// it usually means that Write partially failed
			// and call was already removed.
			err = client.cc.ReadBody(nil)

		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.cc.ReadBody(nil)
			call.done()
		default:
			err = client.cc.ReadBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()//向通道发送一条消息,客户端等待的这个call可以推出了
		}
	}
	// error occurs, so terminateCalls pending calls
	client.terminateCalls(err)//关闭所有请求
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1680261.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

26 分钟惊讶世界,GPT-4o 引领未来人机交互

前言 原文链接&#xff1a;OpenAI最新模型——GPT-4o&#xff0c;实时语音视频交互&#xff0c;未来人机交互近在眼前 - Kaiho小站 北京时间 5 月 14 日凌晨&#xff0c;OpenAI 发布新一代模型——GPT-4o&#xff0c;仅在 ChatGPT 面世 17 个月后&#xff0c;OpenAI 再次通过…

985大学电子信息专硕,考C语言+数据结构!中央民族大学25计算机考研考情分析!

中央民族大学&#xff08;Minzu University of China&#xff09;坐落于北京市学府林立的海淀区&#xff0c;南邻国家图书馆&#xff0c;北依中关村科技园&#xff0c;校园环境典雅&#xff0c;古朴幽美&#xff0c;人文氛围浓郁&#xff0c;具有鲜明的民族特色。由北京市、国家…

ubuntu下不生成core dumped

1、先用ulimit -c&#xff0c;如果看到0&#xff0c;说明没有开core dump。 所以我们输入ulimit -c unlimited&#xff0c;打开core dump。 再次用ulimit -c&#xff0c;看到unlimited了&#xff0c;说明core dump打开了。 注意这句ulimit -c unlimited只对当前会话有效。要永…

通俗易懂讲乐观锁与悲观锁

浅谈乐观锁与悲观锁 乐观锁和悲观锁是Java并发编程中的两个概念。使用乐观锁和悲观锁可以解决并发编程中数据不一致性、死锁、性能差等问题&#xff0c;乐观锁与悲观锁的实行方式不同&#xff0c;所以其特性也不近相同&#xff0c;下文将详细介绍两者的特性与适用场景。 《熊…

STM32-09-IWDG

文章目录 STM32 IWDG1. IWDG2. IWDG框图3. IWDG寄存器4. IWDG寄存器操作步骤5. IWDG溢出时间计算6. IWDG配置步骤7. 代码实现 STM32 IWDG 1. IWDG IWDG Independent watchdog&#xff0c;即独立看门狗&#xff0c;本质上是一个定时器&#xff0c;这个定时器有一个输出端&#…

ZYNQ之嵌入式驱动开发——字符设备驱动

文章目录 Linux驱动程序分类Linux应用程序和驱动程序的关系简单的测试驱动程序在petalinux中添加LED驱动新字符设备驱动 Linux驱动程序分类 驱动程序分为字符设备驱动、块设备驱动和网络设备驱动。 字符设备是按字节访问的设备&#xff0c;比如以一个字节收发数据的串口&#…

谷歌全力反击 OpenAI:Google I/O 2024 揭晓 AI 新篇章,一场激动人心的技术盛宴

&#x1f680; 谷歌全力反击 OpenAI&#xff1a;Google I/O 2024 揭晓 AI 新篇章&#xff0c;一场激动人心的技术盛宴&#xff01; 在这个人工智能的全新时代&#xff0c;只有谷歌能让你眼前一亮&#xff01;来自全球瞩目的 Google I/O 2024 开发者大会&#xff0c;谷歌用一场…

项目组GIT操作规范

分支规范 在开发过程中&#xff0c;一般会存在以下几种分支&#xff1a; main分支(master) master为主分支&#xff0c;也是用于部署生产环境的分支&#xff0c;一般由 dev 以及 fixbug分支合并&#xff0c;任何时间都不能直接修改代码。dev分支 develop 为开发分支&#xff…

Altium Designer封装库和元器件符号库下载与导入教程(SnapEDA 、Ultra Librarian、Alldatasheetcn)

1.AD封装库和元器件符号库下载网址 以下是一些全球热门的Altium Designer封装库和元器件符号库下载网址推荐&#xff1a; Altium Content Vault (现称为Altium Manufacturer Part Search)&#xff1a;这是Altium官方提供的元器件库&#xff0c;可以直接在Altium Designer中使用…

Java码农的福音:再也不怕乱码了

即便是Java这样成熟的语言&#xff0c;开发者们也常常会遇到一个恼人的问题——乱码。 本文将深入探讨乱码的根本原因&#xff0c;并针对Java开发中的乱码场景提出有效的解决方案&#xff0c;辅以实战代码&#xff0c;让Java程序员从此告别乱码困扰。 一&#xff0c;字符集的…

文件存储解决方案-阿里云OSS

文章目录 1.菜单分级显示问题1.问题引出1.苹果灯&#xff0c;放到节能灯下面也就是id大于1272.查看菜单&#xff0c;并没有出现苹果灯3.放到灯具下面id42&#xff0c;就可以显示 2.问题分析和解决1.判断可能出现问题的位置2.找到递归返回树形菜单数据的位置3.这里出现问题的原因…

什么是最大路径?什么是极大路径?

最近学习中&#xff0c;在这两个概念上出现了混淆&#xff0c;导致了一些误解&#xff0c;在此厘清。 最大路径 在一个简单图G中&#xff0c;u、v之间的距离 d ( u , v ) min ⁡ { u 到 v 的最短路的长度 } d(u,v) \min \{ u到v的最短路的长度 \} d(u,v)min{u到v的最短路的…

音乐的力量

常听音乐的好处可以让人消除工作紧张、减轻生活压力、避免各类慢性疾病等等&#xff0c;其实这些都是有医学根据的。‍ 在医学研究中发现&#xff0c;经常的接触音乐节 奏、旋律会对人体的脑波、心跳、肠胃蠕动、神经感应等等&#xff0c;产生某些作用&#xff0c;进而促进身心…

Postman基础功能-接口返回值获取

大家好&#xff0c;之前给大家分享关于Postman的接口关联&#xff0c;我们平时在做接口测试时&#xff0c;请求接口返回的数据都是很复杂的 JSON 数据&#xff0c;有着多层嵌套&#xff0c;这样的数据层级在 Postman 中要怎么获取呢&#xff1f; 接下来给大家展示几个获取 JSO…

容联云零代码平台容犀desk:重新定义坐席工作台

在数智化浪潮的推动下&#xff0c;企业亟待灵活适应市场变化、快速响应客户需求&#xff0c;同时还要控制成本并提升效率&#xff0c;传统的软件开发模式因开发周期长、成本高、更新迭代慢等问题&#xff0c;逐渐难以满足企业灵活多变的业务需求。 容犀Desk&#xff0c;观察到…

(1)双指针算法介绍与练习:移动零

目录 双指针算法介绍 练习&#xff1a;移动零 双指针算法介绍 双指针算法常见于数组和双向链表的题型 在数组中&#xff0c;双指针中的指针代表数组元素的下标&#xff0c;而不是真正的指针类型变量 在双向链表中&#xff0c;双指针中的指针即为真正意义上的指针&#xff…

Windows安装Django

1、下载Python程序包 Python程序包官网下载地址Download Python | Python.org,若下载最新版本&#xff0c;有最新版本则下载"Windows installer (64-bit)" 若是下载其他版本,可在下图位置找到相应的版本,然后点击Download.如下图所示&#xff1a; 打开后查看注意事项…

开源连锁收银系统哪个好

针对开源连锁收银系统的选择&#xff0c;商淘云是一个备受关注的候选。商淘云以其功能丰富、易于定制和稳定性等优势&#xff0c;吸引了众多企业和开发者的关注。下面将从四个方面探讨商淘云开源连锁收银系统的优势&#xff1a; 首先&#xff0c;商淘云提供了丰富的功能模块。作…

Retrying,一个神奇优雅的 Python 库

大家好&#xff01;我是爱摸鱼的小鸿&#xff0c;关注我&#xff0c;收看每期的编程干货。 一个简单的库&#xff0c;也许能够开启我们的智慧之门&#xff0c; 一个普通的方法&#xff0c;也许能在危急时刻挽救我们于水深火热&#xff0c; 一个新颖的思维方式&#xff0c;也许能…

Docker安装Mosquitto

在物联网项目中&#xff0c;我们经常用到MQTT协议&#xff0c;用MQTT协议做交互就需要部署一个MQTT服务&#xff0c;而mosquitto是一个常用的MQTT应用服务&#xff0c; Mosquitto是一个实现了消息推送协议MQTT v3.1的开源消息代理软件。MQTT&#xff08;Message Queuing Teleme…