1、引言
Do not communicate by sharing memory; instead, share memory by communicating
Golang
的并发哲学是“不要通过共享内存进行通信,而要通过通信来共享内存”,提倡通过 channel
进行 goroutine
之间的数据传递和同步,而不是通过共享变量(内存)来实现。
func write(chanInt chan int) {
for i := 0; i < 10; i++ {
chanInt <- i
}
close(chanInt)
}
func read(chanInt chan int, chanExit chan bool) {
for {
v, ok := <-chanInt
if !ok {
break
}
fmt.Println(v)
}
chanExit <- true
close(chanExit)
}
func TestCSP(t *testing.T) {
chanInt := make(chan int, 10)
chanExit := make(chan bool)
go write(chanInt)
go read(chanInt, chanExit)
for {
select {
case _, ok := <-chanExit:
if !ok {
fmt.Println("done")
return
}
}
}
}
如上述示例,write
函数负责写,read
函数负责读,chanInt
负责在两个 goroutine
进行数据同步,chanExit
负责监听数据已处理完成,并最终退出。整个程序没有看到锁,非常的优雅。
接下来,来说说 channel
的特性,最后结合底层源码来加深印象。
2、特性
2.1 基本用法
由于 channel
是引用类型,需要用 make
来初始化
chanBuffer := make(chan int, 10)
chanNoBuffer := make(chan int)
这里创建的是可读写的 channel
,区别在于是否有 capacity
(容量)
- 带缓冲区的
channel
,可以存储cap
个数据 - 不带缓冲区的
channel
,一般用于同步
chanWriteOnly := make(chan<- int)
chanReadOnly := make(<-chan int)
这里创建的是只写和只读的 channel
,不过这样写意义不大,一般用于传参,接下来用这两个 chan
把引言示示例中关于 write
和 read
函数给改下
func write(chanInt chan<- int) {
for i := 0; i < 10; i++ {
chanInt <- i
}
close(chanInt)
}
func read(chanInt <-chan int, chanExit chan bool) {
for {
v, ok := <-chanInt
if !ok {
break
}
fmt.Println(v)
}
chanExit <- true
close(chanExit)
}
查看 channel
的长度和容量
func TestChanLenCAP(t *testing.T) {
chanInt := make(chan int, 2)
chanInt <- 1
fmt.Println(len(chanInt)) // 1
fmt.Println(cap(chanInt)) // 2
}
关闭 channel
close(ch)
判断 channel
是否已关闭
func TestChanIsClosed(t *testing.T) {
chanInt := make(chan int, 10)
close(chanInt)
if _, ok := <-chanInt; !ok {
fmt.Println("closed")
}
}
向一个已关闭的 channel
读数据,会读到零值,并且每次读也都是零值,因此可以利用这个特性来判断 channel
是否已关闭。
2.2 异常情况
接下来看看几种需要注意的异常情况
注意: Golang 版本为 1.19.12。不同版本的调度器和运行时的行为可能会有所不同,尤其是与死锁检测相关的机制。这些变化可能导致在某些版本中程序会更快地检测到死锁,而在其他版本中则可能仅仅是阻塞而不报错。
2.2.1 给一个 nil channel
发送数据,
func TestWriteNil(t *testing.T) {
var chanInt chan int
chanInt <- 1
}
由于 chanInt
还没初始化,值为 nil
,此时代码会阻塞在 chanInt <- 1
这一行,并最终形成死锁。
fatal error: all goroutines are asleep - deadlock!
解法:channel
使用前需要使用 make
初始化。
2.2.2 从一个 nil channel
读数据
func TestReadNil(t *testing.T) {
var chanInt chan int
<-chanInt
}
由于 chanInt
还没初始化,值为 nil
,此时代码会阻塞在 <-chanInt
这一行,并最终形成死锁。
fatal error: all goroutines are asleep - deadlock!
解法:channel
使用前需要使用 make
初始化。
2.2.3 关闭一个 nil channel
func TestCloseNil(t *testing.T) {
var chanInt chan int
close(chanInt)
}
如果尝试关闭一个 nil
的 channel
,会导致运行时错误 panic: close of nil channel
。
panic: close of nil channel [recovered]
panic: close of nil channel
解法:channel
使用前需要使用 make
初始化。
前三个异常说明,channel
使用前一定要使用 make
进行初始化。
2.2.4 向一个已关闭的 channel
发数据
func TestWriteClosed(t *testing.T) {
chanNoBuffer := make(chan int)
close(chanNoBuffer)
chanNoBuffer <- 1
}
向一个已关闭的 channel
发送数据会引起 panic
。
panic: send on closed channel [recovered]
panic: send on closed channel
这是因为一旦 channel
被关闭,就不能再向其发送数据,但可以继续从中接收数据
。
解法:判断 channel
是否已关闭。
2.2.5 向一个已关闭的 channel
发起重复关闭动作
func TestClosedOnceMore(t *testing.T) {
chanNoBuffer := make(chan int)
close(chanNoBuffer)
close(chanNoBuffer)
}
尝试关闭一个已经关闭的 channel
会导致运行时错误 panic: close of closed channel
。这个错误通常出现在多个 goroutine
试图关闭同一个 channel
或者代码逻辑不正确导致同一个 channel
被关闭多次。
panic: close of closed channel [recovered]
panic: close of closed channel
解法:判断 channel
是否已关闭。
2.2.6 向没有缓冲区的 channel
写数据,但没有读取方
func TestSendNoBuffer(t *testing.T) {
ch := make(chan int)
ch <- 4
}
无缓冲的 channel
是一种同步通信机制,当只有发送方,没有接收方,会陷入阻塞而死锁。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
2.2.7 向没有缓冲区的 channel
读取数据,但没有写入方
func TestReadNoBuffer(t *testing.T) {
ch := make(chan int)
<-ch
}
尝试从一个无缓冲的 channel
读取数据时,如果没有其他 goroutine
向该 channel
发送数据,读取操作将会阻塞。这会导致程序死锁,并最终导致运行时错误。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
2.2.8 无缓冲区 channel
的发送和接收操作没有同时进行
func ReadNoBufferChan(chanBool chan bool) {
<-chanBool
}
func TestSendNoBufferChan(t *testing.T) {
ch := make(chan bool)
ch <- true
go ReadNoBufferChan(ch)
time.Sleep(1 * time.Second)
}
上面两个异常一直强调,由于无缓冲 channel
是一种同步
通信机制,需要发送和接收操作同时
进行。代码执行到 ch <- chan
时,调度器发现没有任何 goroutine
接收,于是阻塞并死锁。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
func TestSendNoBufferChan(t *testing.T) {
ch := make(chan bool)
go ReadNoBufferChan(ch)
ch <- true
time.Sleep(1 * time.Second)
}
把 go ReadNoBufferChan(ch)
提前,这样就确保了在发送数据之前,有一个 goroutine
正在等待接收数据。
对于无缓冲的 channel
- 读取和写入要成对出现,并且不能在同一个
goroutine
里 - 使用
for
读取数据时,写入方需要关闭channel
2.2.9 向有缓存区的 channel
先读数据
func TestWriteBufferChan(t *testing.T) {
ch := make(chan int, 1)
if _, ok := <-ch; !ok {
fmt.Println("closed")
}
}
当尝试从一个空的带缓冲的 channel
读取数据时,读取操作会阻塞,直到有数据被写入 channel
。这是因为即使是带缓冲的 channel
,也需要在读取数据时有数据可读。
带缓冲的 channel
和无缓冲的 channel
的主要区别在于:带缓冲的 channel
可以存储一定数量的数据,而无缓冲的 channel
则需要发送和接收操作同步进行。然而,这并不改变以下事实:当一个 goroutine
试图从空的 channel
读取数据时,它会被阻塞,直到有其他 goroutine
写入数据。
fatal error: all goroutines are asleep - deadlock!
解法:需要在读取数据时有数据可读。
2.2.10 向有缓存区的 channel
写数据,但没有读取数据
func TestReadBufferChan(t *testing.T) {
ch := make(chan int, 1)
ch <- 1
ch <- 2
}
当带缓冲的 channel
在缓冲区满时,写入操作会阻塞,直到有数据被读取以腾出缓冲区空间。如没有读取方,最后就会因阻塞而死锁。
fatal error: all goroutines are asleep - deadlock!
解法:当带缓冲的 channel
在缓冲区满时,需要有读取方,或者增加缓冲区的大小。
注意:对于带缓冲的 channel 在缓冲区没超过容量之前,写入数据,若没有读取,不像不带缓冲区的 channel 那样,不会产生死锁的。
其实,最后这两个带缓冲区 channel
异常情况总结就是
- 若在同一个
goroutine
里,写数据操作一定在读数据操作前 - 若
channel
空了,接收者会阻塞 - 若
channel
满了,发送者会阻塞
3、底层实现
3.1 数据结构
Golang
的 channel
在运行时使用 runtime.hchan
结构体表示。
// runtime/chan.go
type hchan struct {
qcount uint // 队列中的数据个数
dataqsiz uint // 环形缓冲区的大小
buf unsafe.Pointer // 环形缓冲区指针
elemsize uint16 // 单个元素的大小
closed uint32 // 标志 channel 是否关闭
elemtype *_type // 元素的类型
sendx uint // 发送操作的索引
recvx uint // 接收操作的索引
recvq waitq // 等待接收的 goroutine 队列
sendq waitq // 等待发送的 goroutine 队列
lock mutex // 保护 channel 的锁
}
先看看环形缓冲区相关的字段:
qcount
: 当前缓冲区中的元素个数。dataqsiz
: 环形缓冲区的容量。buf
: 实际存储数据的缓冲区,类型为unsafe.Pointer
(类似C
语言的void *
)。elemsize
: 每个元素的大小。sendx
: 环形缓冲区中下一个待写入的位置。recvx
: 环形缓冲区中下一个待读取的位置。
再来看看发送和接收队列:
recvq
: 等待接收的goroutine
队列。sendq
: 等待发送的goroutine
队列。
这两个队列是通过 waitq
结构体来实现的,waitq
本质上是一个双向链表,链表中的每个节点是一个 sudog
结构体,sudog
代表一个等待中的 goroutine
。
type waitq struct {
first *sudog
last *sudog
}
最后看看 lock
字段
lock
锁用于保护channel
数据结构的互斥锁。Golang
使用自旋锁和互斥锁的结合来保证channel
操作的线程安全。
3.2 初始化
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
这里主要说下 switch
相关的分支代码
- 第一个分支:如果
channel
的缓冲区大小是0
(也就是创建无缓冲channel
),或channel
中的元素大小是0
(如struct{}{}
,Golang
中“空结构体”是不占内存的,size
为0
)时,调用mallocgc()
在堆上为channel
开辟一段大小为hchanSize
的内存空间。- 这里说下
c.buf = c.raceaddr()
,c.raceaddr()
会返回一个地址,这个地址在内存中不会被实际用于存储数据,但会被数据竞争检测工具(如Golang
的race detector
)用于同步,这也是无缓冲区的channel
用来做数据同步场景的由来。
- 这里说下
- 第二个分支:如果元素不包含指针时。调用
mallocgc
一次性分配hchan
和buf
的内存。 - 第三个分支:默认情况元素类型中有指针类型,调用了两次分配空间的函数
new/mallocgc
。
仔细看,三个分支都调用了 mallocgc
在堆上分配内存,也就说 channel
本身会被 GC
自动回收。
在函数的最后会初始化通道结构的字段,包括元素大小、元素类型、缓冲区大小和锁。
3.2 发送数据
// entry point for c <- x from compiled code
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 当 channel 为 nil 时处理
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
// 竞态检测,是用来分析是否存在数据竞争。go test -race ./...
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 检查 channel 是否关闭
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 检查是否有等待接收的 goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 检查 channel 缓冲区是否有空位
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 非阻塞模式下
if !block {
unlock(&c.lock)
return false
}
// 阻塞模式下,将当前 goroutine 加入发送队列并挂起,receiver 会帮我们完成后续的工作
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 打包 sudog
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
// 将这个发送 g 从 Grunning -> Gwaiting
// 进入休眠
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// 以下唤醒后需要执行的代码
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
// 唤醒后,发现 channel 被关闭了
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
代码比较长,可以分为两大部分:异常检测和发送数据
3.2.1 异常检测
代码一开始就排除了在异常章节中 nil channel
的情形,比如未初始化,或是被 GC
回收了。
接着会检测非阻塞模式下,也就是有缓冲区的 channel
,如果还未 close
并且缓冲区已经满了,则直接返回 false
。
func TestASyncSendFull(t *testing.T) {
ch := make(chan int, 1) // 创建一个缓冲区大小为 1 的 channel
ch <- 1 // 向 channel 发送一个元素,此时缓冲区已满
select {
case ch <- 2: // 尝试发送第二个元素
fmt.Println("Successfully sent 2")
default: // 缓冲区已满,进入 default 分支
fmt.Println("channel is full, unable to send 2")
}
}
3.2.2 发送数据
发送数据可以归纳为以下三点
- 直接发送:当
recvq
存在等待的接收者时,那么通过runtime.send
直接将数据发送给阻塞的接收者- 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的
goroutine
标记成可运行状态grunnable
并把该goroutine
放到发送方所在的处理器的runnext
上等待执行,该处理器在下一次调度
时会立刻唤醒数据的接收方;
- 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的
- 异步发送:当
buf
缓冲区存在空余空间时,将发送的数据写入channel
的缓冲区; - 阻塞发送:当不存在缓冲区或者缓冲区已满时,等待其他
goroutine
从channel
接收数据;- 将当前
goroutine
加入sendq
发送队列并挂起,阻塞等待其他的协程从channel
接收数据; - 当唤醒后,检查是否因为
channel
关闭而唤醒,如果是则触发panic
。
- 将当前
发送数据的过程中包含几个会触发 goroutine
调度的时机:
- 发送数据时发现
channel
上存在等待接收数据的goroutine
,立刻设置处理器的runnext
属性,但是并不会立刻触发调度 - 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入
channel
的sendq
发送队列并调用runtime.goparkunlock
触发goroutine
的调度让出处理器的使用权;
3.3 接收数据
// entry points for <- c from compiled code
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 如果在 nil channel 上进行 recv 操作,那么会永远阻塞
if c == nil {
// 非阻塞的情况下,要直接返回,非阻塞出现在一些 select 的场景中
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 当前 channel 中没有数据可读
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// sender 队列中有 sudog 在等待
// 直接从该 sudog 中获取数据拷贝到当前 g 即可
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
if c.qcount > 0 {
// 直接从 buffer 里拷贝数据
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 接收索引 +1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buffer 元素计数 -1
c.qcount--
unlock(&c.lock)
return true, true
}
// 非阻塞时,且无数据可收
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 打包成 sudog
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 进入 recvq 队列
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
// 被唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
// 如果 channel 未被关闭,那就是真的 recv 到数据了
return true, success
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
// 直接从发送者复制数据
recvDirect(c.elemtype, sg, ep)
}
} else {
// 缓冲区已满,从队列头部取出数据
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// 将数据从队列复制到接收者
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将数据从发送者复制到队列
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
在 Golang
的 channel
中,有两种接收方式
num <- ch
num, ok <- ch
这两种分别对应上述源码中的 chanrecv1
和 chanrecv2
,不过最终都会走到 chanrecv
函数。
3.3.1 异常检测
当我们从一个 nil channel
接收数据时(这里 nil
有可能是被 GC
回收导致的),若是非阻塞的 channel
会直接返回,否则会直接调用 runtime.gopark
让出处理器的使用权。
如果当前 channel
已经被 close
并且缓冲区中不存在任何数据,那么会清除 ep
指针中的数据并立刻返回。这里也就说明了为什么可以多次从已关闭的 channel
读取数据而不会报错。
3.3.2 接收数据
从 channel
接收数据可以归纳为以下三种情况:
3.3.2.1 直接接收
当 sendq
发送队列存在等待的发送者时,通过 runtime.recv
从阻塞的发送者或者缓冲区中获取数据。具体分为以下两种场景,可以仔细看 recv
函数
- 场景一
当 buf
缓冲区的容量 dataqsiz
为 0
,也就是同步的 channel
,调用 recvDirect
将 sendq
发送队列中 sudog
存储的 ep
数据直接拷贝到接收者的内存地址中。
- 场景二
当缓冲区已满时(会有两次内存的拷贝)
- 先取出
buf
缓冲区头部的数据发给接收者(第一次拷贝) - 接着取出
sendq
发送队列头的数据拷贝到buf
缓冲区中,并释放一个sudog
阻塞的goroutine
(第二次拷贝)
到这里获取有人会问,为什么不直接从 sendq
取出数据发给接收方,而是要从 buf
里取出发给接收方?
原因在于 Golang
在缓冲模式下,channel
的数据在缓冲区中按照 FIFO
(先入先出)顺序存储。缓冲区头部的数据肯定是最先存入的,那么也就需要最先取出。
这里再说下场景二下关于 recvx
和 sendx
的更新机制。
- 缓冲区已满时的处理逻辑
当 buf
缓冲区满时,recvx
指向的是 buf
的头部位置,这也是下一个将要被接收的数据。注意此时 sendx
也是指向缓冲区的头部位置。因为缓冲区已满,下一次发送会覆盖最旧的数据。
- 从缓冲区读取数据
此时从已满的 buf
缓冲区读取数据,接收者从缓冲区的头部位置 recvx
获取数据,并将数据传递给接收方。并更新 recvx
,使其指向下一个将要被接收的数据位置。
- 将
sendq
拷贝到缓冲区
由于此时 buf
头部的数据已经发送,那么则取出 sendq
头部的数据覆盖刚刚头部的位置所在的数据,并更新 sendx
,使其和 recvx
保持一致,指向下一个要发送的位置。
这两个场景,无论发生哪种情况,运行时都会调用 runtime.goready
将当前处理器的 runnext
设置成发送数据的 goroutine
,在调度器下一次调度时将阻塞的发送方唤醒。
3.3.2.2 异步接收
当 buf
缓冲区的 qcount
大于 0
时,也就是带缓冲的 channel
有数据时,那么会从 buf
缓冲区中 recvx
的索引位置取出数据进行处理:
- 如果接收数据的内存地址不为空,那么会使用
runtime.typedmemmove
将缓冲区中的数据拷贝到内存中,并通过runtime.typedmemclr
清除队列中的数据 - 最后更新
channel
上相关数据:recvx
指向下一个位置(如果移动到了环形队列的队尾,下标需要回到队头),channel
的qcount
长度减一,并释放持有channel
的锁
3.3.2.3 阻塞接收
当不属于上述两种情况,即当 channel
的 sendq
发送队列中不存在等待的 goroutine
并且 buf
缓冲区中也不存在任何数据时,从 channel
中接收数据的操作会变成阻塞的。此时会将当前的goroutine
挂起并加入 channel
的接收队列 recvq
,以便在有数据可用时能够被唤醒。
当然了,若是 goroutine
被唤醒后会完成 channel
的阻塞数据接收。接收完最后进行基本的参数检查,解除 channel
的绑定并释放 sudog
。
结合异常检测那一节,发现从 channel
接收数据时,会触发 goroutine
调度的两个时机:
- 当
channel
为nil
时 - 当
buf
缓冲区中不存在数据并且也不存在数据的发送者时
3.4 关闭管道
最后来看看关闭通道实现
func closechan(c *hchan) {
// 关闭一个 nil channel 会直接 panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 上锁,这个锁的粒度比较大,一直到释放完所有的 sudog 才解锁
lock(&c.lock)
// 在 close channel 时,如果 channel 已经关闭过了,直接触发 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
// 弹出的 sudog 是 nil,说明读队列已经空了
if sg == nil {
break
}
// sg.elem unsafe.Pointer,指向 sudog 的数据元素
// 该元素可能在堆上分配,也可能在栈上
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将 goroutine 入 glist
// 为最后将全部 goroutine 都 ready 做准备
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 将所有挂在 channel 上的 writer 从 sendq 中弹出
// 该操作会使所有 writer panic(向一个关闭的 channel 发数据会引起 panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将所有挂在 channel 上的 writer 从 sendq 中弹出
// 该操作会使所有 writer panic
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 在释放所有挂在 channel 上的读或写 sudog 时,是一直在临界区的
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 使 g 的状态切换到 Grunnable
goready(gp, 3)
}
}
3.4.1 异常检测
- 关闭一个
nil channel
会直接panic
- 在
close channel
时,如果channel
已经关闭过了,直接触发panic
3.4.2 释放所有接收方和发送方
关闭 channel
的主要工作是释放所有的 readers
和 writers
。
主要就是取出 recvq
和 sendq
的 sudog
加入到 goroutine
待清除 glist
队列中,与此同时该函数会清除所有 runtime.sudog
上未被处理的元素。同时需要注意的是:在处理 sendq
时有可能会 panic
,在之前的异常情况中列举往一个 close
的 channel
发送数据会引起 panic
。
最后会为所有被阻塞的 goroutine
调用 runtime.goready
触发调度。将所有 glist
队列中的 goroutine
状态从 _Gwaiting
设置为 _Grunnable
状态,等待调度器的调度。
3.4.3 优雅关闭通道
最后说说如何优雅关闭 channel
。
通过之前的异常小节介绍,发现:
- 向已关闭的
channel
发送数据,会导致panic
- 重复关闭
channel
,也会导致panic
同时,还了解了:
- 从一个已关闭的
channel
中接收数据,会得到零值,且不会导致程序异常 - 关闭一个
channel
,那么所有接收这个channel
的select case
都会收到信号
那么这里就引用 How to Gracefully Close Channels 介绍的优雅关闭 channel
方法来收尾。
package _0240623
import (
"log"
"math/rand"
"strconv"
"sync"
"testing"
"time"
)
func TesGracefullyCloseChannel(t *testing.T) {
rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown
// below, and its receivers are all senders
// and receivers of dataCh.
toStop := make(chan string, 1)
// The channel toStop is used to notify the
// moderator to close the additional signal
// channel (stopCh). Its senders are any senders
// and receivers of dataCh, and its receiver is
// the moderator goroutine shown below.
// It must be a buffered channel.
var stoppedBy string
// moderator
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
// Here, the try-send operation is
// to notify the moderator to close
// the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
// The try-receive operation here is to
// try to exit the sender goroutine as
// early as possible. Try-receive and
// try-send select blocks are specially
// optimized by the standard Go
// compiler, so they are very efficient.
select {
case <-stopCh:
return
default:
}
// Even if stopCh is closed, the first
// branch in this select block might be
// still not selected for some loops
// (and for ever in theory) if the send
// to dataCh is also non-blocking. If
// this is unacceptable, then the above
// try-receive operation is essential.
select {
case <-stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// Same as the sender goroutine, the
// try-receive operation here is to
// try to exit the receiver goroutine
// as early as possible.
select {
case <-stopCh:
return
default:
}
// Even if stopCh is closed, the first
// branch in this select block might be
// still not selected for some loops
// (and forever in theory) if the receive
// from dataCh is also non-blocking. If
// this is not acceptable, then the above
// try-receive operation is essential.
select {
case <-stopCh:
return
case value := <-dataCh:
if value == Max-1 {
// Here, the same trick is
// used to notify the moderator
// to close the additional
// signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
这段代码的核心是这里
// moderator
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
对于生产者和消费者是 M*N
的情况,显然既不能在生产方关闭通道,也不适合在消费方关闭通道。那么就引入中间方,那就是 toStop
,起个 goroutine
然后 stoppedBy = <-toStop
阻塞在这里,只要生产者和消费者一方满足条件,向 toStop
写入数据了,那么就可以关闭 stopCh
。这也正好契合上面的 moderator
注释,一个 协调者
,用来协调生产者和消费者在 M*N
情况下如何优雅关闭 channel
。