简介
channel顾名思义就是channel的意思,主要用来在协程之间传递数据,所以是并发安全的。其实现原理,其实就是一个共享内存再加上锁,底层阻塞机制使用的是GMP模型。可见 GMP模型就是那个道,道生一,一生二,二生三,三生万物。
一个简单的例子 如下:
func main() {
c := make(chan int32, 1)
go func() {
c <- 1
}()
go func() {
fmt.Println(<-c)
}()
time.Sleep(2 * time.Second)
close(c)
}
运行结果是:
1
其汇编部分代码如下:
CALL runtime.makechan(SB) // 对应 make(chan int32,1)
...
CALL runtime.chanrecv1(SB) // 对应 <-c
...
CALL runtime.chansend1(SB) // 对应 c<-1 其中 <- 编译后就代表运行时 chanrecv1 函数
...
CALL runtime.closechan(SB) // 对应 close(c)
CALL runtime.closechan(SB)
协程部分汇编代码,因为不是重点,略过 感兴趣的自己可以编译看看。
chan的所有内容都存放在runtime.hchan这个结构体中,makechan,chanrecv1和chansend1函数都是操作hchan这个结构体来实现chan的功能的。下面我们来看下 hchan结构体。
两种重要结构体
hchan 结构体
hchan结构体 如下
type hchan struct {
qcount uint // total data in the queue 环形队列里面的总的数据量
dataqsiz uint // size of the circular queue // 环形队列大小 就是 make chan时 申请的大小
buf unsafe.Pointer // points to an array of dataqsiz elements // 指向环形队列的指针
elemsize uint16 // 储存的元素类型占空间大小
closed uint32 // chan 状态 1 关闭 0 未关闭
elemtype *_type // element type // 元素类型 // make chan是 指定的类型 不过这个类型要进行运行时转换 但对应关系是这样的
sendx uint // send index // 发送索引
recvx uint // receive index // 获取索引
recvq waitq // list of recv waiters // 获取协程等待队列
sendq waitq // list of send waiters // 发送携程等待队列
lock mutex // 锁
}
waitq 结构体
waitq结构体用来存储阻塞在chan上的协程状态sudog结构体(内部包含了 协程信息)
其结构如下:
// 等待队列
type waitq struct {
first *sudog // 双向链表 头
last *sudog // 双向链表 尾
}
这个结构体有两个函数 enqueuq 和dequeue 插入链表和删除链表 就是双向链表的基础操作
hchan的结构丑图如下:
接下来我们按照 执行流程来梳理下部分源码 源码位置在 runtime/chan.go中,首先是 make(chan int32,1)函数,我们都知道 make函数可以初始化,map,slice和chan。编译时根据不同类型,会调用makechan,makeslcie和makemap函数。我们来看下 makechan函数。
makechan
其源码如下:
请结合比较丑流程图来看,比较好理解。
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
// compiler checks this but be safe.
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
// 计算内存
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
// 如果chan是空的 则需要申请hchanSize空间 以满足 内存对齐要求
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 分配内存
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化hchan
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}
其实 就是 根据 chan 中元素类型和 大小 来计算需要的存储空间。逻辑还是比较清晰的。初始化了空间后,接下来就应该向chan存数据了,上例中是c <- 1,在编译时会转译成 chansend1函数。其源码如下:
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
它只是调用了 chansend 函数 且 阻塞 状态是 true(阻塞状态true代表 如果 chan 的 buf满员,则写协程放入sendq,如果 chan 为空,则读协程会放入 recvq。 如果阻塞状态是 false 如果chan buf,满员 则返回错误。如果chan 为空,则读协程读取chan返回错误。阻塞状态为false 主要是select函数用)。接下来我们来看下其源码:
chansend
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 省略部分代码
// chan没关闭 且 缓存buf已满 且 是非阻塞模式(select方法使用) 则不会写入 sendq里 直接返回错误
if !block && c.closed == 0 && full(c) {
return false
}
// todo
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// 如果chan已经关闭 直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 有阻塞的取协程 送的数据 直接交给取协程 并将取协程唤醒
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 如果没有阻塞的获取协程 且 channel容量没满 则需要插入channel中
if c.qcount < c.dataqsiz {
// 获取要放的位置的指针
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
// 将元素 放入其中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
// ring buffer 循环数组 到尾 就 从头开始
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
// 解锁
unlock(&c.lock)
return true
}
// buf满了 但是没有 阻塞 则直接返回失败
if !block {
unlock(&c.lock)
return false
}
// 如果channel满了 则所有send协程就需要加入 sendq 里排队,创建一个 等待状态的 sudog 包装当前 协程和数据 ep 放入 sendq中
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 初始化 sudog
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 放入 发送等待队列
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
// 调用GMP模型 阻塞协程 并释放 chan的 lock 锁,释放后别的协程可以进来
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
// ep保持活着 不被垃圾回收
KeepAlive(ep)
// 从等待队列唤醒
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
// 释放sudog
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
其执行主流程如下:
首先加锁
- 如果recvq里有数据,则将recvq链表头节点中的sudog拿出来,将send的数据直接交给sudog的recv协程,然后唤醒这个协程,最后释放当前send协程拥有的锁,返回。
- 否则 如果 hchan buf没满,则将数据存入其中,更新 qcount的值,释放锁,返回。
- 否则 初始化一个sudog并将 当前 send协程放入其中,并阻塞(调用GMP模型),然后释放当前send协程拥有的锁(别的send协程可以执行 chansend)
- 当前阻塞的send协程被待被recv协程唤醒。
- 唤醒后 将 当前协程状态变为非等待,释放当前协程对应的sudog 返回
这里有注意的点,sendq里的协程只能被 recv协程唤醒,反之亦然。这里就带来一个有趣的问题,sendq和recvq能都有数据吗?大神们可以思考下。
send讲完了,接下来改recv了,要不不就阻塞了吗,
例子中的 < - c 是 编译时会转译成 chanrecv1,我们来看下源码:
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
其block参数也是 true ,这阻塞跟 send一样。我们来看下 chanrecv方法吧
chanrecv
其源码如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 非主逻辑代码 跳过
lock(&c.lock)
// 如果chan已关闭 且qcount==0 则 证明chan中已没有数据 返回
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// 如果chan没关闭 先看 sendq里 有没有阻塞获取sudog 如果有 取出来 直接将 数据给其中的协程 并唤醒 阻塞的读操作 并解锁
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 走到这里 证明 chan没关闭 且 sendq没数据 则从缓存区取数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 非阻塞(waitq不能有数据), 返回
if !block {
unlock(&c.lock)
return false, false
}
// 将 协程 构造sudog 存放到 recvq 中 阻塞协程, 下面代码跟 sendq类似
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 插入 recvq 链表尾部
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
其执行主流程如下:
首先加锁
- 如果协程已关闭 且 buf中没有数据 则 返回
- 如果协程没关闭 则从 sendq里取sudog数据,如果取到了 就 将 数据 传递给 sudog的send协程 当前recv协程释放锁,唤醒send协程 返回。
- 如果 没有从sendq里取到数据,则从 buf 里取数据,更新qcount 然后 解锁,返回
- 如果 buf 为空 ,则初始化一个sudog 将 当前协程放入其中,阻塞当前recv协程,释放当前recv协程拥有的锁。
- 等待其他send协程唤醒 当前recv协程。
- 唤醒后 将 当前协程状态变为非等待,释放当前协程对应的sudog 返回
send和recv后,接下来就该 close了
close( c)编译后 函数 closechan函数 我们来看下
closechan
源码如下:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
// 加锁
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
// 将锁状态变为1
c.closed = 1
var glist gList
// 释放所有 读协程
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 释放所有写协程 包括其数据 所以 我们从关闭的协程里读的数据 就是 buf 中的 不会有 sendq里的数据
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 将所有协程唤醒
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
close 主要是将阻塞的 其读和写协程 携带的 元素 释放,(但是缓存buf里的数据并没释放)可以使得GC捕获,然后将阻塞的协程唤醒。这时 在 chansend函数 lock()处 阻塞的协程会panic,被goready 唤醒的协程会正常退出;在 chanrecv 函数 lock()处 阻塞的协程(或者继续执行<-c的协程)会继续从 buf 拿数据,当数据获取完后,会退出。
总结
channel 其实就是用了共享内存加锁这种机制来处理协程之间的共享数据的,这次阅读源码还是有些细节没整明白,整体理解的也不够透彻,还望大神指正。