Channel 的使用
Channel 声明方法
- chInt := make(chan int) // unbuffered channel 非缓冲通道
- chInt := make(chan int, 0) // unbuffered channel 非缓冲通道
- chInt := make(chan int, 2) // bufferd channel 缓冲通道
Channel 基本用法
- ch <- x // channel 接收数据 x
- x <- ch // channel 发送数据并赋值给 x
- <- ch // channel 发送数据,忽略接受者
如果使用了非缓冲通道,此时向缓冲区塞数据需要有地方能立即接收数据,不然会一致阻塞。原理是此时缓冲区无数据(无缓冲),向 channel 发送数据视为直接发送,即直接发送到正在休眠等待的协程中。
func main() {
ch := make(chan string)
// 阻塞
ch <- "ping"
<-ch
}
启动协程来拿数据:
func main() {
ch := make(chan string)
// 程序能通过
go func() {
<-ch
}()
ch<-"ping"
}
内存与通信
不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。主要是为了:
- 避免协程竞争和数据冲突。
- 更高级的抽象,降低开发难度,增加程序可读性。
- 模块之间更容易解耦,增强扩展性和可维护性。
通过共享内存案列:
func watch(p *int) {
for {
if *p == 1 {
fmt.Println("go")
break
}
}
}
func main() {
i := 0
go watch(&i)
time.Sleep(time.Second)
i = 1
time.Sleep(time.Second)
}
通过通信的方式如下:
func watch(c chan int) {
if <-c == 1 {
fmt.Println("go")
}
}
func main() {
c := make(chan int)
go watch(c)
time.Sleep(time.Second)
c <- 1
time.Sleep(time.Second)
}
Channel 的设计
Channel 在 Go 的底层表示为一个 hchan 结构体:
type hchan struct {
/* 缓存区结构开始 */
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
elemtype *_type // element type
/* 缓存区结构结束*/
// 发送队列
sendx uint // send index
sendq waitq // list of send waiters
// 接收队列
recvx uint // receive index
recvq waitq // list of recv waiters
lock mutex
// 0:关闭状态;1:开启状态
closed uint32
}
type waitq struct {
first *sudog
last *sudog
}
数据存放在一个环形缓冲区 Ring Buffer,可以降低内存/GC的开销
关于 c <- “x” 语法糖,channel 数据发送原理
Go 中会把 c<- 编译为 chansend1 方法:
// %GOROOT%src/runtime/chan.go
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
发送数据的 3 种情形:
- 缓冲区无数据,向 channel 发送数据视为直接发送,将数据直接拷贝给等待接收的协程的接受量,并唤醒该协程;如果无等待中的接收协程,则将数据放入缓冲区。
- 缓冲区有数据但缓冲区未满,则数据存到缓冲区。
- 接收队列中无休眠等待的协程,且缓冲区已满,则将数据包装成 sudog,放入 sendq 队列休眠等待,然受给 channel 解锁。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 1. 取出接收等待队列 recvq 的协程,将数据发送给它
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 接收等待队列中没有协程时
if c.qcount < c.dataqsiz {
// 2. 缓存空间还有余量,将数据放入缓冲区
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// ...
// 3. 休眠等待
gp := getg()
// 包装为 sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 将数据,协程指针等记录到 mysq 中
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将 mysg 自己入队
c.sendq.enqueue(mysg)
// 休眠
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// 被唤醒后再维护一些数据,注意,此时的记录的数据已经被拿走
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
// ...
}
关于 rec <-c 语法糖,channel 数据接收原理
Go 中会把 <-c 编译为 func chanrecv 方法,具体如下:
- 编译阶段,rec <- c 转化为
runtime.chanrecv1()
- 编译阶段,rec, ok <- c 转化为
runtime.chanrecv2()
- 最终会调用
chanrecv()
方法
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
接收数据的 4 种情形:
- 有等待中的发送协程(sendq),但缓冲区为空,从协程接收
- 有等待中的发送协程(sendq),但缓冲区非空,从缓存接收
- 接收缓存
- 阻塞接收
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
if c.closed != 0 {
// ...
} else {
// 1.,2. 接收数据前,已经有协程在休眠等待发送数据
if sg := c.sendq.dequeue(); sg != nil {
// 在 1 的情况下,缓存为空,直接从 sendq 的协程取数据
// 在 2 的情况下,从缓存(缓冲)取走数据后,将 sendq 里的等待中的协程的数据放入缓存,并唤醒该协程
// 这就是为什么 sendq 队列中的协程被唤醒后,其携带的数据已经被取走的原因
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 3. 直接从缓存接收数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// ...
// 缓冲区为空,同时 sendq 里也没休眠的协程,则休眠等待
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// 休眠
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// ...
}
从 Channel 接收数据的过程中被唤醒,说明之前因为没有数据而休眠等待,当发送方发送数据时,会主动将数据拷贝至接收方本地。