Go 源码之 Chan

news2024/11/24 13:04:56

Go 源码之 chan

go源码之chan - Jxy 博客

目录

  • Go 源码之 chan
    • 一、总结
    • 二、源码
      • (一)hchan
      • (二)创建
      • (三)发送
      • (四)接收
      • (五)关闭
    • 三、常见问题
      • 1.为什么要使用环形队列
      • 2. 关于chan的操作
      • 有劳各位看官 `点赞、关注➕收藏 `,你们的支持是我最大的动力!!!
      • 接下来会不断更新 `golang` 的一些`底层源码及个人开发经验`(个人见解)!!!
      • 同时也欢迎大家在评论区提问、分享您的经验和见解!!!

一、总结

chan 提供了一种在 goroutine 之间进行数据交换和同步的方式。通道可以用于控制并发访问和共享数据,从而减少竞态条件和死锁问题,并且可以自然地处理异步事件和信号。如果你的应用程序需要在 goroutine 之间传递数据或消息,那么通道是一个不错的选择

  • 内部是一个 hchan 结构(字段见源码),环形队列 + 发送者双向链表 + 接收者双向链表 + 锁
  • channel 与 select 语句结合使用时,底层调用的还是 chansendchanrecv 函数
  • channel
    • 结构:环形缓存、sendq、recvq;
    • 流程:上锁/解锁,阻塞/非阻塞,缓冲/非缓冲,缓存入队出队,sudog 入队出队,协程休眠/唤醒

二、源码

/src/runtime/chan.go

  • 一个环形队列
  • 两个双向列表

image-20230323100951975.png

(一)hchan

buf + sendx + recvx 形成环形队列


type hchan struct {
	qcount   uint           // 队列中现存元素数量
	dataqsiz uint           // 队列容量(缓冲区)
	buf      unsafe.Pointer // 队列,指向一个动态分配的数组,用于存储 channel 中的元素
	elemsize uint16         // 队列中元素大小
	closed   uint32         // 0 正常	,1 关闭
	elemtype *_type     		// 队列中元素类型,
  
	sendx    uint   				// 队列(buf)已发送位置,当(sendx++)==dataqsiz,则从头开始发,sendx=0
	recvx    uint   				// 队列(buf)已接收位置;
  												// 当 `sendx` 和 `recvx` 相等时,channel 中无元素,发送 / 接收 操作阻塞
  
	recvq    waitq  				// 双向链表 ,FIFO 由 recv 行为(也就是读 <-ch)阻塞在 channel 上的 goroutine 队列
	sendq    waitq 					// 双向链表 ,FIFO 由 send 行为 (也就是写 ch<-) 阻塞在 channel 上的 goroutine 队列

	
	lock mutex 							// 读写锁,保护hchan中的所有字段,以及waitq中所有的字段
}

// 双向链表,存储了g
type waitq struct {
	first *sudog  // 链表头部,协程 g 的数据结构
	last  *sudog  // 链表尾部,协程 g 的数据结构
}

(二)创建

ch1 := make(chan int)
ch2 := make(chan int,2)

底层都是调用了runtime.makechan()

  • 合法性校验
    • 数据类型大小校验
    • 内存溢出校验
  • 初始化 hchan
    • 初始化 无缓冲 hchan
    • 初始化 有缓冲 && 无指针元素 hchan
    • 初始化 无缓冲 && 有指针元素 hchan
    • 初始化 hchan 其他元素:如 dataqsize、elemsize、elemtype、lock
// 主要逻辑:合法性验证 和 分配地址空间
// t 是指向 chantype 的指针,size 表示缓冲区大小,0表无缓冲
func makechan(t *chantype, size int) *hchan {
	elem := t.elem // 元素的类型

  // ----------- 1. 合法性验证 ----------
	// 数据类型大小验证,大于1<<16时异常
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
  // 内存对齐(降低寻址次数),大于最大内存(8字节数)时异常
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

  // 传入的size大于堆可分配的最大内存时:内存溢出异常
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

  // ----------- 2. 分配地址空间 ----------
  // hchanSize 为 hchan 结构大小
  // mem 为缓存区大小
  /* 根据 channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan 和 缓冲区,分为三种情况:
  		* 如果不存在缓冲区,分配 hchan 结构体空间,即无缓存 channel
  		* 如果 channel 存储的类型不是指针类型,分配连续地址空间,包括 hchan 结构体 + 数据
  		* 默认情况包括指针,为 hchan 和 buf 单独分配数据地址空间
  	更新 hchan 结构体的数据,包括 elemsize、elemtype 和 dataqsiz
	*/
	var c *hchan
	switch {
    case mem == 0:
    	 // 创建无缓冲的 chan ,buf==0 ,初始化 hchan
       c = (*hchan)(mallocgc(hchanSize, nil, true)) // hchanSize表示空hchan需要占用的字节
       c.buf = c.raceaddr()  //  raceaddr内部实现为:return unsafe.Pointer(&c.buf)
    case elem.ptrdata == 0:
       // 有缓存区,并且队列中不存在指针,分配连续地址空间,大小为 hchanSize + mem
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       // buf指针指向空hchan占用空间的末尾
       c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
       // 队列包含指针类型
       // 为buf单独开辟mem大小的空间,用来保存所有的数据
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
    }

	c.elemsize = uint16(elem.size)  			// 元素大小
	c.elemtype = elem 										// 元素类型
	c.dataqsiz = uint(size) 							// chan 缓存区大小
	lockInit(&c.lock, lockRankHchan) 			// 初始化锁

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

(三)发送

ch <- 1

执行 runtime.chansend1(SB)

  • 异常检查
    • 发送到 nil chan 中,阻塞挂起
    • 往 closed chan 发送(写),则 panic
    • 当前 chan 是否可以发送
  • 同步发送:recvq 中存在等待接收者,则直接唤醒并发送数据
  • 异步发送:c.qcount < c.dataqsiz 缓存区空闲,则数据发送到缓存区
  • 阻塞发送:当前面都不满足时 且 block = true 时:发送操作 线程阻塞 挂起,并且添加到 sendq 等待队列,直到有接收者接收才释放
/**
* @Description: 
 chansend函数主要可以归纳为四部分:
	 异常检查、同步发送、异步发送、阻塞发送:
* @Param:c:hchan结构;ep:发送的元素;block:是否阻塞;callerpc:
* @return: true发送成功,false发送失败
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ------------------------------------ 1.异常检查 ------------------------------------
	if c == nil { 
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) 	// 发送到 nil chan 中,阻塞挂起
		throw("unreachable")
	}
	..........

 																							 // 当channel不为nil,此时检查channel是否做好接收发送操作的准备,
  if !block && c.closed == 0 && full(c) { 	
		return false															// 非阻塞且未关闭: 1. 无缓存区,recvq为空 2. 有缓冲区,但是buffer已满
	}


	lock(&c.lock) // 先上锁

	if c.closed != 0 { // chan已经关闭,则解锁
		unlock(&c.lock)
		panic(plainError("send on closed channel")) 				// 往 closed chan 发送(写),则 panic
	}

  // ------------------------------------  2.同步发送 ------------------------------------
  																										// recvq 中存在等待接收者,则直接唤醒并发送数据
	if sg := c.recvq.dequeue(); sg != nil { 
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)		// recvq 等待队列取取出 sg(sudog)并唤醒并发送数据 ep
		return true
	}
   
  // ------------------------------------  3.异步发送 ------------------------------------
  // (有缓存区,没有等待接收者,先发到缓冲区中,等有接收者再去读)
	if c.qcount < c.dataqsiz {  					// 存在的元素个数< 缓冲区:说明缓存区可以继续写数据
		qp := chanbuf(c, c.sendx) 					// 获取缓存区index地址
		typedmemmove(c.elemtype, qp, ep)		// 数据写入buffer
		c.sendx++ 													// 发送数据的下标++
		if c.sendx == c.dataqsiz { 					// 当发送数据的下标等于缓冲区,表数据发送完毕,从头开始
			c.sendx = 0
		}
		c.qcount++ 													// 元素数量++
		unlock(&c.lock) 										// 解锁
		return true 												// 返回结果
	}

	if !block {
		unlock(&c.lock) 										// 解锁
		return false
	}

  // ------------------------------------ 4. 阻塞发送 ------------------------------------
  // 当前面都不满足时(没有等待接收者,没有空闲缓冲区) 且 block = true 时,发送操作 线程阻塞 挂起,直到有接收者接收才释放:
	gp := getg()
	..........
  c.sendq.enqueue(mysg)  											// 将发送 的 sg 添加到 sendq 等待队列中
	return true
}
func full(c *hchan) bool {
	if c.dataqsiz == 0 { // 无缓冲
		return c.recvq.first == nil
	}
	// 有缓冲,现有元素的个数 是否等于 缓冲区容量时(缓冲区满)
	return c.qcount == c.dataqsiz
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   ......
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep) //  将数据拷贝到接收变量的内存地址上
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1) // 唤醒sudog协程;下一轮调度时会唤醒这个接收的 goroutine。
}
// 实现了等待队列的入队操作。它将一个元素添加到等待队列的末尾,并更新队列的 first 和 last 指针
func (q *waitq) enqueue(sgp *sudog) {
	sgp.next = nil // 表示该元素是队列的最后一个
	x := q.last // 将等待队列 q 中的最后一个元素(如果存在)赋值给变量 x。
	if x == nil { // 如果队列中最后一个都没有,则队列无元素,即 x 为 nil,
		sgp.prev = nil // 则将 sgp 元素的 prev 指针设为 nil,表示该元素是队列中的第一个元素,
		q.first = sgp //  然后将队列的 first 和 last 指针都指向该元素,表示该元素是队列中唯一的元素。
		q.last = sgp  // 然后直接返回,结束入队操作。
		return
	}
	sgp.prev = x // sgp 是新加的最后元素,需要关联前一个元素(x为原队列中最后一个元素)
	x.next = sgp // 设置x的的下一个元素为新加的元素
	q.last = sgp // 设置q队列的最后一个元素
}

(四)接收

i <- ch i, ok <- ch

执行 runtime.chanrecv1(SB) 都是调用的chanrecv()

  • 异常检查
    • 从 nil chan 中读取,阻塞挂起
    • 从 closed chan 接收(读),返回零值
    • 当前 chan 是否可以接收
  • 同步接收:sendq 中存在发送者,则直接唤醒并接收数据
  • 异步接收:c.qcount 队列中有元素,则则从 buf 中读取数据
  • 阻塞接收:当前面都不满足时 且 block = true 时:接收操作 线程阻塞 挂起,并且添加到 recvq 等待队列,直到有发送者才释放
chanrecv 函数的逻辑和 chansend 的逻辑基本一致

(五)关闭

close(ch)

closechan(c *hchan)

主要逻辑:

  • 异常检查:
    • 关闭 nil chan ,panic
    • 关闭 closed chan,panic
  • 标记 chan 为关闭状态
  • 释放等待的 sudog: 唤醒并调度等待队列 recvq、sendq 中的 sudog,所有接收者收到零值

func closechan(c *hchan) {
  // ------------------------------------ 1. 异常检查 ------------------------------------
	if c == nil {
		panic(plainError("close of nil channel")) 				// 关闭 nil chan ,panic
	}

	lock(&c.lock)																			// 上锁
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))		// 关闭 closed chan,panic
	}

	c.closed = 1																			// 标识chan已经关闭

   // ------------------------------------ 2. 释放等待的 sudog ------------------------------------
	var glist gList 																	// 存储 recvq、sendq 等待队列中的 sg(sudog)
	for {
		sg := c.recvq.dequeue()													// 将 recvq 等待队列中的  sg(sudog) 添加到 glist
		......
		glist.push(gp)
	}
	for {
		sg := c.sendq.dequeue()													// 将 sendq 等待队列中的  sg(sudog) 添加到 glist
		......
		glist.push(gp)
	}
	unlock(&c.lock)																		// 解锁

  for !glist.empty() {															//依次从 glist 中弹出 sg(sudog)并唤醒执行,所有接收者收到零值
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

三、常见问题

1.为什么要使用环形队列

chan的内部使用环形队列来存取元素,每次发/收元素时,会根据sendx/recvx记录的位置从队列buf中存取元素,

所以环形队列:buf+sendx+recvx实现的,

使用环形数组实现的好处:

  • 避免对数组进行复制或者移动操作

    比如数组【1,2,3】,现在添加4,变为【2,3,4】,数组就需要进行复制拷贝操作,如果是环形队列,则直接将4添加到队列的尾部即可,

  • 避免内存分配和拷贝的开销,从而提高程序的性能

    重复利用,避免重新分配内存

2. 关于chan的操作

image-20230328143228214.png

有劳各位看官 点赞、关注➕收藏 ,你们的支持是我最大的动力!!!

接下来会不断更新 golang 的一些底层源码及个人开发经验(个人见解)!!!

同时也欢迎大家在评论区提问、分享您的经验和见解!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1563975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Linux] 排查问题指令top/ps/netstat

在Linux下查看某个端口运行的指令 1. 首先通过netstat来查看端口对应的进程号 比如抓取端口53这个DNS服务的进程 netstat -tulnp | grep 53 可以看到53这个端口号对应的pid是720 2. 通过ps指令来对进程号执行的命令查询 ps aux | grep 720 可以看到pid为720这个进程对应的执…

Android APP代码混淆技术解析与实战指南

Android APP 加固是优化 APK 安全性的一种方法&#xff0c;常见的加固方式有混淆代码、加壳、数据加密、动态加载等。下面介绍一下 Android APP 加固的具体实现方式。 混淆代码 使用 ipaguard工具可以对代码进行混淆&#xff0c;使得反编译出来的代码很难阅读和理解&#xff…

【中文视觉语言模型+本地部署 】23.08 阿里Qwen-VL:能对图片理解、定位物体、读取文字的视觉语言模型 (推理最低12G显存+)

项目主页&#xff1a;https://github.com/QwenLM/Qwen-VL 通义前问网页在线使用——&#xff08;文本问答&#xff0c;图片理解&#xff0c;文档解析&#xff09;&#xff1a;https://tongyi.aliyun.com/qianwen/ 论文v3. : 一个全能的视觉语言模型 23.10 Qwen-VL: A Versatile…

CentOS7安装Flink1.17伪分布式

前提条件 拥有1台CentOS7 CentOS7安装好jdk&#xff0c;官方文档要求java 11&#xff0c;使用java 8也可以。可参考 CentOS7安装jdk8 下载安装包 下载安装包 [hadoopnode1 ~]$ cd installfile/ [hadoopnode1 installfile]$ wget https://archive.apache.org/dist/flink/flin…

Stream流,线程

文章目录 Stream流思想作用三类方法获取方法单列集合(Collection[List,Set双列集合Map(不能直接获取)数组同一类型元素(Stream中的静态方法) 常见的中间方法终结方法收集方法 Optional类 线程相关概念多线程概念实现方式继承Thread类实现Runnable接口比较 常用方法线程安全产生…

【现代控制】倒立摆模型

基础公式 转动惯量&#xff1a; 欧拉拉格朗日等式 倒立摆模型建立 由拉格朗日等式推导出微分方程&#xff1a; 也就是 将zdot移到等式左边&#xff0c;化简得到 展开就是&#xff1a; 系统线性化 法一&#xff1a;雅可比矩阵 法二&#xff1a;小角度假设 化简最终得…

nslookup查询网站是否支持IPV6

nslookup是一种网络管理命令行工具&#xff0c;可用于查询DNS域名和IP地址输入指令nslookup默认服务器和Address是当前上网所用的DNS服务器域名和地址A记录A&#xff08;Address&#xff09;记录指的是用来指定主机名或域名对应的IP记录。

OpenHarmony实战:轻量级系统之子系统移植概述

OpenHarmony系统功能按照“系统 > 子系统 > 部件”逐级展开&#xff0c;支持根据实际需求裁剪某些非必要的部件&#xff0c;本文以部分子系统、部件为例进行介绍。若想使用OpenHarmony系统的能力&#xff0c;需要对相应子系统进行适配。 OpenHarmony芯片适配常见子系统列…

2024春招冲刺题单 ONT68 最接近的三数之和【中等 数组,递归 Java,Go,PHP】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/f889497fd1134af5af9de60b4d13af23 相同题目: https://www.lintcode.com/problem/59 思路 本题答案是n数之和相关问题的模板。参考答案Java import java.util.*;public class Solution {/*** 代码中的类名、方法…

【C语言】“vid”Microsoft Visual Studio安装及应用(检验内存泄露)

文章目录 前言安装包获取配置VLD完成 前言 我们在写代码时往往容易存在内存泄漏的情况&#xff0c;所以存在这样一个名为VLD的工具用来检验内存泄漏&#xff0c;现在我来教大家安装一下 安装包获取 vld下载网址&#xff1a;https://github.com/KindDragon/vld/releases/tag/…

【GPT5进展】GPT-5将于今年年中发布

OpenAI即将发布的GPT-5代表了人工智能技术的一个重大进步&#xff0c;这一新一代模型预计将进一步扩大OpenAI在AI应用领域的影响力。以下是关于GPT-5的几个关键点&#xff0c;旨在清晰、简洁地向读者传达这一重要更新&#xff1a; 1. 性能和功能的实质性提升 GPT-5在性能上做…

读取信息boot.bin和xclbin命令

bootgen读Boot.bin命令 johnjohn-virtual-machine:~/project_zynq/kv260_image_ubuntu22.04$ bootgen -read BOOT-k26-starter-kit-202305_2022.2.bin xclbinutil读xclbin命令 johnjohn-virtual-machine:~/project_zynq/kv260_image_ubuntu22.04$ xclbinutil -i kv260-smartca…

2024 年每个程序员都应该尝试的 8 个AI工具

随着人工智能技术的极速发展&#xff0c;新的 AI 工具正以前所未有的速度涌现&#xff0c;为开发者们带来了前所未有的机会和挑战。在这个不断演进的时代&#xff0c;掌握最新的 AI 技术已成为每个程序员的必修课。 在本文中&#xff0c;我们收集了8 个程序员在 2024 年值得尝…

函数调用实现小米汽车智能语音助手

上周小米汽车发布&#xff0c;其中有一个特色功能就是智能语音&#xff0c;小爱同学整合了语音大模型&#xff0c;实现智能座舱体验。 雷老板的PPT也演示了&#xff0c;一些口语化的对话就能触发各种指令&#xff0c;无论是开空调、播放音乐&#xff0c;还是找手机、识别前方汽…

vulnhub pWnOS v2.0通关

知识点总结&#xff1a; 1.通过模块来寻找漏洞 2.msf查找漏洞 3.通过网站源代码&#xff0c;查看模块信息 环境准备 攻击机&#xff1a;kali2023 靶机&#xff1a;pWnOS v2.0 安装地址&#xff1a;pWnOS: 2.0 (Pre-Release) ~ VulnHub 在安装网址中看到&#xff0c;该靶…

axios 封装 http 请求详解

前言 Axios 是一个基于 Promise 的 HTTP 库&#xff0c;它的概念及使用方法本文不过多赘述&#xff0c;请参考&#xff1a;axios传送门 本文重点讲述下在项目中是如何利用 axios 封装 http 请求。 一、预设全局变量 在 /const/preset.js 中配置预先设置一些全局变量 window.…

【Qt 学习笔记】Qt 背景介绍

博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;Qt 专栏关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Qt 背景介绍 文章编号&#xff1a;Qt 学习笔记 / 01 文章目录 Qt 背景…

vue 条件渲染、列表循环渲染、事件绑定 初探第三天

条件渲染 <script>const app Vue.createApp({data(){return {show:true,conditionOne: false,conditionTwo: true,}},template:<div v-if"show"> hello word </div><div v-if"conditionOne"> if </div><div v-else…

[lesson02]C到C++的升级

C到C的升级 C与C的关系 C继承了所有的C特性C在C的基础上提供了更多的语法和特性C的设计目标是运行效率与开发效率的统一 C到C的升级 C更强调语言的实用性 所有的变量都可以在需要使用时再定义 int c 0; for (int i 1; i < 3; i) {for(int j 1; j < 3; j){c i * …

Kubernetes(k8s):部署、使用 metrics-server

Kubernetes&#xff08;k8s&#xff09;&#xff1a;部署、使用 metrics-server 一、metrics-server简介二、部署metrics-server2.1、 下载 Metrics Server 部署文件2.2、修改metrics-server.yaml 文件2.3、 部署 Metrics Server2.4、 检查 Metrics Server 三、使用 Metrics Se…