Golang:context基于go1.22版本
- chan的作用和使用方法
- 共享内存的优缺点
- chan 的使用方法
- chanel 的底层结构
- channel 结构体
- 创建channel
- 写流程
- 异常处理
- 写时有阻塞读流程
- 写时无阻塞读流程,缓冲区有空间
- 写时无阻塞读流程,缓冲区无空间
- 写流程整体架构
- 读流程
- 异常处理
- 读时有发送阻塞协程
- 读时无阻塞写协程,缓存有数据
- 读时无阻塞写协程,缓存无数据
- 关闭
- 小结
chan的作用和使用方法
当我们了解chan的底层原理之前,应该先先想一想chan解决了哪些问题?假如没有chan,那么在协程之间怎么通信?
Golang官方有一个很经典的话,不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存
。下面讲解一下共享内存的优缺点,能够直观的展示go为什么通过共享内存解决并发安全。
共享内存的优缺点
缺点:
- 如果没有chan,通过共享内存的方式来解决并发安全性问题,这会导致大量的线程在同一时间访问同一个内存地址,会出现
竞争态,可能会导致数据不一致
。 - 复杂的同步原语: 如果想实现并发安全,那么就需要编写大量的同步锁,这样会让开发人员编写代码变得繁琐
优点:就一个字快
而通过通信就能解决以上问题,channel是一种类似于FIFO队列的数据结构,chan的每次发送和他能保证数据在发送和接受时是按照顺序传输的,保证了数据一致性,接受都是原子的,所以不会存在竞争态。
chan 的使用方法
channel有两种初始化方式,分别是带缓存和不带缓存的:
c := make(chan int) // 无缓存
a := make(chan int, 10)
使用方式:
发送
c <- 4
接受 一
b := <- c
接受 二
b , ok := <-c
会判断当前的chan是否被关闭
chanel 的底层结构
源码位于runtime/chan.go下,有兴趣的可以看看源码实现,本块会分为五部分讲解,channel结构,创建channel,发送数据,接受数据,关闭channel。
channel 结构体
const (
maxAlign = 8 // 用于内存对齐,分配的空间都是8的倍数
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
debugChan = false
)
type hchan struct {
qcount uint // 队列中数据的个数
dataqsiz uint // 环形数组的大小,channel本身就是一个环形队列
buf unsafe.Pointer // 存放的是实际数据的指针
elemsize uint16 // 元素的大小
closed uint32 // 标识channel是否被关闭
elemtype *_type // 数据的元素类型
sendx uint // 发送的指针
recvx uint // 接受的指针
recvq waitq // 阻塞的接受队列
sendq waitq // 阻塞的发送队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex // 锁
}
type waitq struct {
first *sudog // 队列头部
last *sudog // 队列尾部
}
// 等待g队列
type sudog struct {
g *g // goroutine,协程
next *sudog // 队列的下一个节点
prev *sudog // 队列的前一个节点
elem unsafe.Pointer //读取/写入 channel 的数据的容器
// 读通道 : 数据会从hchan的队列中,拷贝到sudog的elem中
// 写通道 : 与读通道类似,是将数据从 sudog 的elem处拷贝到hchan的队列中
acquiretime int64
releasetime int64
ticket uint32
isSelect bool // 标识当前是否在select多路复用下
success bool
waiters uint16
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // 标识与当前sudog交互的chan
}
从数据结构中我们不难看出,channel本身就是一个环形缓冲区,数据都在堆上面,因为channel避免不了并发访问,所以使用 同步锁来保证并发安全
。channel 中有一个接受阻塞队列,一个是发送阻塞队列,当向一个已经满的channel发送数据会被阻塞,此时就会把发送的goroutine
添加到sendq中,同理,向一个空的channel接受数据也会阻塞,并添加到recvq中。
创建channel
当我们通过make创建一个chan的时候,会调用makechan函数
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
// 判断元素类型的大小
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 内存对齐限制
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
// 无缓冲类型
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}
- 判断申请内存空间是否越界,men大小为element 类型的大小和个数相乘的到的,所以当个数为0时,就是无缓冲channel
- 根据类型,分为无缓冲类型,有缓冲非pointer类型,有缓冲pointer类型
- 如果为无缓冲类型,申请一个默认大小为96的空间
- 如果有缓冲的非pointer类型,则一次分配好96 + mem 大小的空间,此时buf为96+元素大小*元素个数的连续内存
- 如果有缓冲的pointer类型,则分别申请 chan 和 buf 的空间,两者无需连续
写流程
异常处理
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
}
- 如果chan仅仅被声明但是没有被初始化就会引发死锁
- 如果向已经关闭的chan,写入数据会panic
写时有阻塞读流程
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
- 如果能从等待读队列读取到
goroutine
证明有读协程阻塞,此时直接将元素copy到读取到的goroutine
中 - 如果 sg.elem 不为 nil,说明发送的数据不是空的,那么就调用 sendDirect 函数直接将数据发送到通道中。然后将 sg.elem 置为 nil,表示数据已经成功发送。
写时无阻塞读流程,缓冲区有空间
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
- 如果当前队列的元素个数小于缓冲区的个数,那么缓冲区还有空间
- 将当前元素添加到,缓冲区sendx对应的位置,然后send ++ ,如果元素满了,把sendx重新移到队首,然后解锁返回
写时无阻塞读流程,缓冲区无空间
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
- 先创建一个g和一个等待g队列,然后建立sudo,g,chan之间的关系
- c.sendq.enqueue(mysg)sudog添加到当前chan写阻塞队列,
- gp.parkingOnChan.Store(true) 意思是说明当前g是在某个chan等待执行,这使得调度器知道每个g的状态
- gopark()将当前的
goroutine
置于休眠状态等待被唤醒 - KeepAlive(ep)即确保被发送的值在接收方将其复制出去之前保持存活,在发送ep之后保证不被垃圾回收,只有等待接收方将数据结构复制后,在进行回收
- 后续就是被唤醒后的操作,回收sudog
写流程整体架构
读流程
异常处理
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
}
}
- 如果读取的chan没有进行初始化那么直接panic,死锁。
- 如果通道已经关闭,并且通道内没有元素,那么直接回收垃圾返回
读时有发送阻塞协程
/ Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
- 如果能够从sendq发送阻塞队列中读取数据,则判断 channel是否有缓冲,如果无缓冲则对sendq队列中的元素copy,倘若有缓冲,则队列已满,接收方需要取队列头部的元素,然后将发送方的数据放入队列中,。然后将应的
goroutine
标记为可执行状态。
读时无阻塞写协程,缓存有数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
- 先获取到接受缓冲区c.recvx 所指向的槽位的指针,然后将缓冲区的数据copy到当前读取的chan中。最后更新索引revx的位置,解锁返回
读时无阻塞写协程,缓存无数据
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
- 先创建一个g和一个等待g队列,然后建立sudo,g,chan之间的关系
- c.recvq.enqueue(mysg)sudog添加到当前chan写阻塞队列,
- gp.parkingOnChan.Store(true) 意思是说明当前g是在某个chan等待执行,这使得调度器知道每个g的状态
- gopark()将当前的
goroutine
置于休眠状态等待被唤醒 - 后续就是被唤醒后的操作,回收sudog
关闭
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
- 关闭未初始化的和已经关闭的channel会panic
- 将阻塞读协程队列中的协程节点统一添加到 glist
- 将阻塞写协程队列中的协程节点统一添加到 glist
- goready将所有的写成都写到等待执行队列
小结
读完源码你会发现,上面的代码都是讲阻塞模式下的channel,那么有没有非阻塞的呢?有的,那就是使用select多路复用,但是要设置default,后续我会专门讲解select源码解析
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
非阻塞模式会吧block设置成false,流程还是和阻塞一样,