go Channel原理 (三)

news2024/11/19 7:29:46

Channel

设计原理

不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。

在主流编程语言中,多个线程传递数据的方式一般都是共享内存。
在这里插入图片描述
Go 可以使用共享内存加互斥锁进行通信,同时也提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。在这里插入图片描述
上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。

接收数据

两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。这是一个 生产者 - 消费者 模型,负责接收数据的 goroutine 从 channel 读取一个消息进行消费,channel 起到一个临界区/缓冲区的作用。

// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // 省略 debug 内容 …………

   // 如果是一个 nil 的 channel
   if c == nil {
      // 如果不阻塞,直接返回 (false, false)
      if !block {
         return
      }
      // 否则,接收一个 nil 的 channel,goroutine 挂起
      gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
      // 不会执行到这里
      throw("unreachable")
   }

   // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
   // 当我们观察到 channel 没准备好接收:
   // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
   // 2. 缓冲型,但 buf 里没有元素
   // 之后,又观察到 closed == 0,即 channel 未关闭。
   // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
   // 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
   if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
      c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
      atomic.Load(&c.closed) == 0 {
      return
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   // 加锁
   lock(&c.lock)

   // channel 已关闭,并且循环数组 buf 里没有元素
   // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
   // 也就是说即使是关闭状态,但在缓冲型的 channel,
   // buf 里有元素的情况下还能接收到元素
   if c.closed != 0 && c.qcount == 0 {
      if raceenabled {
         raceacquire(unsafe.Pointer(c))
      }
      // 解锁
      unlock(&c.lock)
      if ep != nil {
         // 从一个已关闭的 channel 执行接收操作,且未忽略返回值
         // 那么接收的值将是一个该类型的零值
         // typedmemclr 根据类型清理相应地址的内存
         typedmemclr(c.elemtype, ep)
      }
      // 从一个已关闭的 channel 接收,selected 会返回true
      return true, false
   }

   // 等待发送队列里有 goroutine 存在,说明 buf 是满的
   // 1. 非缓冲型的 channel。直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
   // 2. 缓冲型的 channel,但 buf 满了。接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
   if sg := c.sendq.dequeue(); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }

   // 缓冲型,buf 里有元素,可以正常接收
   if c.qcount > 0 {
      // 直接从循环数组里找到要接收的元素
      qp := chanbuf(c, c.recvx)

      // …………

      // 没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // 清理掉循环数组里相应位置的值
      typedmemclr(c.elemtype, qp)
      // 接收游标向前移动
      c.recvx++
      // 接收游标归零
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      // buf 数组里的元素个数减 1
      c.qcount--
      // 解锁
      unlock(&c.lock)
      return true, true
   }

   if !block {
      // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
      unlock(&c.lock)
      return false, false
   }
   
   // 构造一个 sudog 并设置相应参数
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }

   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.selectdone = nil
   mysg.c = c
   gp.param = nil
   // 进入 channel 的等待接收队列
   c.recvq.enqueue(mysg)
   // 将当前 goroutine 挂起
   goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

   // 被唤醒了,接着从这里继续执行一些扫尾工作
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   // 释放当前 gorountine 的 sudog
   releaseSudog(mysg)
   return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是非缓冲型的 channel
   if c.dataqsiz == 0 {
      if raceenabled {
         racesync(c, sg)
      }
      
      // 未忽略接收的数据 不是 "<- ch",而是 "val <- ch",ep 指向 val
      if ep != nil {
         // 直接拷贝数据,从 sender goroutine -> receiver goroutine
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // 缓冲型的 channel,但 buf 已满。
      // 1. 循环数组 buf 队首的元素拷贝到接收数据的地址
      // 2. 将 sender 的数据入队。
      qp := chanbuf(c, c.recvx)
      // …………
      // 将 recvx 处的数据拷贝给接收者
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }

      // sender data -> buf
      typedmemmove(c.elemtype, qp, sg.elem)
      // 更新索引
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx
   }
   sg.elem = nil
   gp := sg.g

   // 解锁
   unlockf()
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }

   // 将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
   goready(gp, skip+1)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

从 channel 接收消息 的 核心函数是 chanrecv

跟 send 流程差不多:
特殊情况:

  1. 如果 channel 为空,那么会直接调用 runtime.gopark 挂起当前 goroutine。
  2. 如果 channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回零值。
    正常情况:
  3. 如果 channel 的 sendq 队列中存在挂起的 goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 goroutine 的数据拷贝到缓冲区。
  4. 如果 channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据。
  5. 在默认情况下会挂起当前的 goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒。
    从 channel 接收数据时,会触发 goroutine 调度的两个时机:
  6. 当 channel 为空时。
  7. 当缓冲区中不存在数据并且也不存在数据的发送者时。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1882204.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何在本地一键配置最强国产大模型

自从OpenAI的ChatGPT横空出世以来&#xff0c;国内外各类大语言模型&#xff08;LLM&#xff09;层出不穷&#xff0c;其中不乏Google的Gemini、Claude、文心一言等等。相较于竞争激烈的商业模型赛道&#xff0c;以Llama为代表的开源大模型的进步速度也十分惊人。 伴随着大语言…

乾元通渠道商中标吴忠市自然灾害应急能力提升项目

近日&#xff0c;乾元通渠道商中标宁夏回族自治区吴忠市自然灾害应急能力提升项目&#xff0c;乾元通作为设备厂家&#xff0c;为项目提供通信指挥类装备&#xff08;多链路聚合设备&#xff09;QYT-X1。 青岛乾元通数码科技有限公司作为国家应急产业企业&#xff0c;深耕于数据…

酒店品牌网站建设订房拓宽渠道如何进行

酒店在城市中有多个品牌&#xff0c;主要以住宿、餐饮为主&#xff0c;覆盖本地生活、外地旅游、活动策划场景等人群业务&#xff0c;同行竞争和自身方略发展以及客户的便捷需求&#xff0c;都使得酒店需要不同方向发力提高品牌力以及多个事项管理等。 酒店需要客户进店完成服…

GPT-5:下一代AI如何彻底改变我们的未来

GPT-5 发布前瞻&#xff1a;技术突破与未来展望 随着科技的飞速发展&#xff0c;人工智能领域不断迎来新的突破。根据最新消息&#xff0c;OpenAI 的首席技术官米拉穆拉蒂在一次采访中确认&#xff0c;GPT-5 将在一年半后发布&#xff0c;并描述了其从 GPT-4 到 GPT-5 的飞跃如…

【你也能从零基础学会网站开发】(了解)关系型数据库的基本架构体系结构与概念理解

&#x1f680; 个人主页 极客小俊 ✍&#x1f3fb; 作者简介&#xff1a;程序猿、设计师、技术分享 &#x1f40b; 希望大家多多支持, 我们一起学习和进步&#xff01; &#x1f3c5; 欢迎评论 ❤️点赞&#x1f4ac;评论 &#x1f4c2;收藏 &#x1f4c2;加关注 关系型数据库的…

《数字图像处理与机器视觉》案例三 (基于数字图像处理的物料堆积角快速测量)

一、前言 物料堆积角是反映物料特性的重要参数&#xff0c;传统的测量方法将物料自然堆积&#xff0c;测量物料形成的圆锥表面与水平面的夹角即可&#xff0c;该方法检测效率低。随着数字成像设备的推广和应用&#xff0c;应用数字图像处理可以更准确更迅速地进行堆积角测量。 …

在AvaotaA1全志T527开发板上使用 SSH 连接开发板

使用 SSH 连接开发板 启动系统 前提条件&#xff1a; 确保已经制作好AvaotaA1系统镜像至TF卡。 ​ 确保开发板电源供电正常&#xff1a;默认SPI显示屏有图案输出。 确保当前环境下有可以正常上网的路由器RJ45网线接口。 获取IP地址 如果想通过ssh去登陆开发板系统&#…

便携式气象站:科技助力,气象观测的新选择

在气象观测领域&#xff0c;便携式气象站不仅安装方便、操作简单&#xff0c;而且功能齐全、性能稳定&#xff0c;为气象观测带来了极大的便利。 首先&#xff0c;便携式气象站的便携性&#xff0c;与传统的气象站相比&#xff0c;它不需要复杂的安装过程和固定的设备基础&…

[A133]全志u-boot中的I2C驱动分析

[A133]全志u-boot中的I2C驱动分析 hongxi.zhu 2024-6-27 一、IIC标准读写时序 IIC是高位(MSB)先传输 二、代码流程 2.1主机写数据 brandy/brandy-2.0/u-boot-2018/drivers/i2c/sunxi_i2c.c static int sunxi_i2c_write(struct i2c_adapter *adap, uint8_t chip,uint32_t addr…

GA自动点击器(Auto Clicker)v1.0.83高级版

其能够在用户指定的任何应用程序中&#xff0c;根据设定的时间间隔和位置&#xff0c;自动进行重复的点击或滑动动作. 链接&#xff1a;https://pan.baidu.com/s/11AV4Zxg6y9364kADDDSFYQ?pwdbo6q 提取码&#xff1a;bo6q

深入解读一下`android.os.CountDownTimer`

简介 在 Android 开发中&#xff0c;CountDownTimer 是一个非常有用的类&#xff0c;它可以用于倒计时任务&#xff0c;比如倒计时器、限时活动等。CountDownTimer 提供了一个简单的方式来实现定时操作&#xff0c;无需我们手动管理线程和计时器。 本文将深入解析 CountDownT…

HiBit Uninstaller:软件批量卸载,一触即得

名人说&#xff1a;莫道谗言如浪深&#xff0c;莫言迁客似沙沉。 ——刘禹锡《浪淘沙》 创作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 一、软件介绍1、HiBit Uninstaller2、核心功能 二、下载安装1、下载2、安装 …

文件操作与管理

程序经常需要访问文件和目录&#xff0c;读取文件信息或写入文件信息&#xff0c;在Python语言中对文件的读写是通过文件对象&#xff08;file object&#xff09;实现的。Python的文件对象也称为类似文件对象或流&#xff08;stream&#xff09;&#xff0c;因为Python提供一种…

13_网络安全

目录 网络安全协议 网络安全协议 PGP协议 网络安全技术 防火墙技术 入侵检测系统 入侵防御系统 杀毒软件 蜜罐系统 计算机病毒与木马 网络安全协议 网络安全协议 物理层主要使用物理手段隔离、屏蔽物理设备等&#xff0c;其他层都是靠协议来保证传输的安全&#xff…

【python刷题】蛇形方阵

题目描述 给出一个不大于 99 的正整数n&#xff0c;输出n*n的蛇形方阵。从左上角填上1开始&#xff0c;顺时针方向依次填入数字&#xff0c;如同样例所示。注意每个数字有都会占用3个字符&#xff0c;前面使用空格补齐。 输入 输入一个正整数n,含义如题所述 输出 输出符合…

ITK-二值阈值分割

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 什么是二值阈值分割&#xff1f; 二值阈值分割是一种常见的图像处理技术&#xff0c;用于将图像的像素值分成两个类别&#xff1…

基于GTX的64B66B编码的自定义协议发送模块(高速收发器二十一)

点击进入高速收发器系列文章导航界面 1、64B66B组帧原理 前文讲解64B66B编码原理时&#xff0c;已经讲解过组帧的原理&#xff0c;包括数据帧和控制帧两种&#xff0c;区别在于同步码不同。 下图是802.3的以太网控制协议&#xff0c;其中S表示起始位&#xff0c;T表示停止位。为…

LINUX系统编程:多线程互斥

目录 1.铺垫 2.线程锁接口的认识 静态锁分配 动态锁的分配 互斥量的销毁 互斥量加锁和解锁 3.加锁版抢票 4.互斥的底层实现 1.铺垫 先提一个小场景&#xff0c;有1000张票&#xff0c;现在有4个进程&#xff0c;这四个进程疯狂的去抢这1000张票&#xff0c;看看会发生什…

熊猫烧香是什么?

熊猫烧香&#xff08;Worm.WhBoy.cw&#xff09;是一种由李俊制作的电脑病毒&#xff0c;于2006年底至2007年初在互联网上大规模爆发。这个病毒因其感染后的系统可执行文件图标会变成熊猫举着三根香的模样而得名。熊猫烧香病毒具有自动传播、自动感染硬盘的能力&#xff0c;以及…

简单爬虫案例——爬取快手视频

网址&#xff1a;aHR0cHM6Ly93d3cua3VhaXNob3UuY29tL3NlYXJjaC92aWRlbz9zZWFyY2hLZXk9JUU2JThCJTg5JUU5JTlEJUEy 找到视频接口&#xff1a; 视频链接在photourl中 完整代码&#xff1a; import requestsimport re url https://www.kuaishou.com/graphql cookies {did: web_…