在 Go 语言发布之前,很少有语言从底层为并发原语提供支持。大多数语言还是支持共享和内存访问同步到 CSP 的消息传递方法。Go 语言算是最早将 CSP 原则纳入其核心的语言之一。内存访问同步的方式并不是不好,只是在高并发的场景下有时候难以正确的使用,特别是在超大型,巨型的程序中。基于此,并发能力被认为是 Go 语言天生优势之一。追其根本,还是因为 Go 基于 CSP 创造出来的一系列易读,方便编写的并发原语。
不要通过共享内存进行通信。建议,通过通信来共享内存。(Do not communicate by sharing memory; instead, share memory by communicating)这是 Go 语言并发的哲学座右铭。相对于使用 sync.Mutex 这样的并发原语。虽然大多数锁的问题可以通过 channel 或者传统的锁两种方式之一解决,但是 Go 语言核心团队更加推荐使用 CSP 的方式。
行文目录
channel的使用场景
把channel用在数据流动的地方:
- 消息传递、消息过滤
- 信号广播
- 事件订阅与广播
- 请求、响应转发
- 任务分发
- 结果汇总
- 并发控制
- 同步与异步
核心数据结构
hchan
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
hchan: channel 数据结构
- qcount:当前 channel 中存在多少个元素;
- dataqsize: 当前 channel 能存放的元素容量;
- buf:channel 中用于存放元素的环形缓冲区;
- elemsize:channel 元素类型的大小;
- closed:标识 channel 是否关闭;
- elemtype:channel 元素类型;
- sendx:发送元素进入环形缓冲区的 index;
- recvx:接收元素所处的环形缓冲区的 index;
- recvq:因接收而陷入阻塞的协程队列;
- sendq:因发送而陷入阻塞的协程队列;
waitq
type waitq struct {
first *sudog
last *sudog
}
waitq:阻塞的协程队列
- first:队列头部
- last:队列尾部
sudog
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// ...
c *hchan
}
sudog:用于包装协程的节点
- g:goroutine,协程;
- next:队列中的下一个节点;
- prev:队列中的前一个节点;
- elem: 读取/写入 channel 的数据的容器;
- c:标识与当前 sudog 交互的 chan;
构造器函数
这里分成三种,无缓冲型的channel,struct类型的有缓冲的以及pointer类型的有缓冲的。
创建 channel 常见代码:
ch := make(chan int)
在底层会调用makechan64()
或者 makechan()
,这里分析makechan
方法
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// ...
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return
}
- 首先判断申请内存空间大小是否越界,mem 大小为 element 类型大小与 element 个数相乘后得到,仅当无缓冲型 channel 时,因个数为 0 导致大小为 0;
- 根据类型,初始 channel,分为无缓冲型(mem为0)、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel;
- 倘若为无缓冲型,则仅申请一个大小为默认值 96 (hchanSize)的空间;
- 如若有缓冲的 struct 型,则一次性分配好 96 + mem 大小的空间,并且调整 chan 的 buf 指向 mem 的起始位置;
- 倘若为有缓冲的 pointer 型,则分别申请 chan 和 buf 的空间,两者无需连续;
- 对 channel 的其余字段进行初始化,包括元素类型大小、元素类型、容量以及锁的初始化。
发送数据写流程
向 channel 中发送数据常见代码:
ch <- 1
那么会实际调用chansend1 -> chansend
方法,这里介绍下。
两类异常情况处理
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
}
- 对于未初始化的 chan,写入操作会引发死锁;
- 对于已关闭的 chan,写入操作会引发 panic.
case1:写时存在阻塞读协程(同步发送)
前提:写,存在阻塞读的协程,说明要么无缓冲,要么缓冲区满了。假设同步发送没有缓冲区,将当前写协程加入到队列里面。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// ..
}
- 首先进行加锁操作,保证线程安全;
- 并再一次检查 channel 是否关闭。如果关闭则抛出 panic
- 从阻塞调度的读协程队列的头部中取出第一个非空的 goroutine 的封装对象 sudog;
- 在 send 方法中,会基于
memmove
方法,直接将元素拷贝交给 sudog 对应的 goroutine; - 在 send 方法中会完成解锁动作.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
send() 函数主要完成了 2 件事:
- 调用
sendDirect()
函数将数据拷贝到了接收变量的内存地址上 - 调用
goready()
将等待接收的阻塞 goroutine 的状态从Gwaiting
或者Gscanwaiting
改变成Grunnable
。下一轮调度时会唤醒这个接收的 goroutine。
case2:写时无阻塞读协程且环形缓冲区仍有空间(异步发送)
前提:缓冲区还有空闲位置,能够直接将数据写入缓冲区。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// ...
}
- 首先进行加锁操作,保证线程安全;
- 将当前元素添加到环形缓冲区
sendx
对应的位置; - sendx++;
- qcount++;
- 解锁,返回。
case3:写时无阻塞读协程且环形缓冲区无空间(阻塞发送)
前提:当 channel 处于打开状态,但是没有接收者,并且没有 buf 缓冲队列或者 buf 队列已满,这时 channel 会进入阻塞发送。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
gp.waiting = nil
closed := !mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
- 首先进行加锁操作,保证线程安全;
- 调用
getg()
方法获取当前 goroutine 的指针,用于绑定给一个sudog
- 调用
acquireSudog()
方法获取一个sudog
,可能是新建的sudog
,也有可能是从缓存中获取的。设置好 sudog 要发送的数据和状态。比如发送的 Channel、是否在 select 中和待发送数据的内存地址等等。 - 完成指针指向,建立
sudog
、goroutine
、channel
之间的指向关系;把sudog
添加到当前channel
的阻塞写协程队列中; - 调用 gopark 方法挂起当前 goroutine,状态为 waitReasonChanSend,阻塞等待 channel。
- 倘若协程从 park 中被唤醒,则回收 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走);
- 解锁,返回
写流程整体串联
小结
关于 channel 发送的源码实现已经分析完了,针对 channel 各个状态做一个小结。
Channel Status | Result | |
---|---|---|
Write | nil | 阻塞 |
Write | 打开但填满 | 阻塞 |
Write | 打开但未满 | 成功写入值 |
Write | 关闭 | panic |
Write | 只读 | Compile Error |
channel 发送过程中包含 2 次有关 goroutine 调度过程:
- 当接收队列中存在 sudog 可以直接发送数据时,执行
goready()
将 g 插入 runnext 插槽中,状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable,等待下次调度便立即运行。 - 当 channel 阻塞时,执行
gopark()
将 g 阻塞,让出 cpu 的使用权。
需要强调的是,通道并不提供跨 goroutine 的数据访问保护机制。如果通过通道传输数据的一份副本,那么每个 goroutine 都持有一份副本,各自对自己的副本做修改是安全的。当传输的是指向数据的指针时,如果读和写是由不同的 goroutine 完成的,那么每个 goroutine 依旧需要额外的同步操作。
读流程
异常 case1:读空 channel
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ...
}
- park 挂起,引起死锁,报错异常;
异常 case2:channel 已关闭且内部无元素
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
}
// ...
}
如果 channel 已经关闭且不存在缓存数据了,则清理 ep 指针中的数据并返回。这里也是从已经关闭的 channel 中读数据,读出来的是该类型零值的原因。
读流程
异常 case1:读空 channel
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ...
}
- park 挂起,引起死锁,报错异常;
异常 case2:channel 已关闭且内部无元素
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
}
// ...
}
如果 channel 已经关闭且不存在缓存数据了,则清理 ep 指针中的数据并返回。这里也是从已经关闭的 channel 中读数据,读出来的是该类型零值的原因。
case3:读时有阻塞的写协程(同步接收)
前提:在 channel 的发送队列中找到了等待发送的 goroutine。取出队头等待的 goroutine。
- 如果缓冲区的大小为 0,则直接从发送方接收值。
- 否则,对应缓冲区满的情况,从队列的头部接收数据,发送者的值添加到队列的末尾(此时队列已满,因此两者都映射到缓冲区中的同一个下标)。
也就是问题在go channel中,对于有缓冲的channel,如果channel满了,且有阻塞的写协程,此时有一个读协程,读取数据,那么流程是什么?
以下是具体的流程:
- 有一个有缓冲的 channel,其缓冲区已满。
- 一个写协程尝试向这个 channel 写数据,但因为 channel已满,所以写协程被阻塞。
- 同时,有一个读协程尝试从这个 channel 读数据。读协程成功读取到一个数据项,这将在 channel的缓冲区中腾出一个空位。
- 被阻塞的写协程立刻被唤醒,将数据写入刚刚腾出的空位。
- 读协程和写协程继续执行它们剩下的操作。
同步接收的核心逻辑见下面 recv()
函数:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
需要注意的是由于有发送者在等待,所以如果存在缓冲区,那么缓冲区一定是满的。这个情况对应发送阶段阻塞发送的情况,如果缓冲区还有空位,发送的数据直接放入缓冲区,只有当缓冲区满了,才会打包成 sudog,插入到 sendq 队列中等待调度。注意理解这一情况。
接收时主要分为 2 种情况,有缓冲且 buf 满和无缓冲的情况:
- 无缓冲。ep 发送数据不为 nil,调用
recvDirect()
将发送队列中sudog
存储的 ep 数据直接拷贝到接收者的内存地址中。 - 有缓冲并且 buf 满。有 2 次 copy 操作,先将队列中
recvx
索引下标的数据拷贝到接收方的内存地址,再将发送队列头的数据拷贝到缓冲区中,释放一个sudog
阻塞的 goroutine。[备注:缓冲区满了,可以直接读,但是读完需要善后,也就是把阻塞写协程的值拷贝过来,然后释放]。
case4:读时无阻塞写协程且缓冲区有元素(异步接收)
前提:如果 Channel 的缓冲区中包含一些数据时,但没有满,从 Channel 中接收数据会直接从缓冲区中 recvx 的索引位置中取出数据进行处理:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
}
- 加锁;
- 获取到 recvx 对应位置的元素;
- recvx++;
- qcount–;
- 解锁,返回;
case5:读时无阻塞写协程且缓冲区无元素(阻塞接收)
前提:如果 channel 发送队列上没有待发送的 goroutine,并且缓冲区也没有数据时,将会进入到最后一个阶段阻塞接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
gp.waiting = nil
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
- 调用 getg() 方法获取当前 goroutine 的指针,用于绑定给一个 sudog。
- 调用
acquireSudog()
方法获取一个 sudog,可能是新建的 sudog,也有可能是从缓存中获取的。设置好 sudog 要发送的数据和状态。比如发送的 Channel、是否在 select 中和待发送数据的内存地址等等。 - 调用
c.recvq.enqueue
方法将配置好的 sudog 加入待发送的等待队列。 - 设置原子信号。当栈要 shrink 收缩时,这个标记代表当前 goroutine 还 parking 停在某个 channel 中。在 g 状态变更与设置 activeStackChans 状态这两个时间点之间的时间窗口进行栈 shrink 收缩是不安全的,所以需要设置这个原子信号。
- 调用 gopark 方法挂起当前 goroutine,状态为 waitReasonChanReceive,阻塞等待 channel。
读流程和整体串联
状态表
操作 | nil的channel | 正常channel | 已关闭channel |
---|---|---|---|
<- ch(读) | 阻塞 | 成功或阻塞 | 有数据,正常接收 无数据,零值 |
ch <-(发) | 阻塞 | 成功或阻塞 | panic |
close(ch) | panic | 成功 | panic |
往 nil channel 上进行操作:
- 发送:如果你尝试发送数据到一个 nil channel 上,操作会被阻塞。
- 接收:如果你尝试从一个 nil channel 接收数据,操作也会被阻塞。
- 关闭:你不能关闭一个 nil channel,否则会触发 panic。
- 对已关闭的 channel 进行操作:
- 发送:如果你尝试向一个已关闭的 channel 发送数据,会触发 panic。
- 接收:你可以从一个已关闭的 channel 接收数据。如果 channel 中还有数据,你会正常接收到数据;如果 channel 为空,你会接收到该通道类型的零值。接收操作不会被阻塞。
- 关闭:如果你尝试关闭一个已经关闭的 channel,会触发 panic。
参考
https://zhuanlan.zhihu.com/p/597232906