GO channel解析
是什么?
官方文档:
- https://go.dev/ref/spec#Channel_types
- https://go.dev/blog/pipelines(channel提供了流式编程的例子)
在 Go 语言中,channel 是一种用于在 goroutine 之间进行通信和同步的机制。可以用于在不同的 goroutine 之间传递数据。channel 可以是带缓存的或无缓存的,带缓存的 channel 可以缓存一定数量的元素,而在无缓存的 channel 中,发送和接收操作是同步的,这意味着发送操作会被阻塞直到有其他 goroutine 准备好接收数据,而接收操作也会被阻塞直到有其他 goroutine 准备好发送数据。channel 的使用可以有效地避免竞态条件和死锁等并发编程中的问题。
channel 满足并发编程中的通信和同步操作。通过 channel,不同的 goroutine 可以在不使用互斥锁等同步机制的情况下进行数据传递和共享,从而避免了竞态条件和死锁等并发编程中的问题。同时,channel 的使用可以使 goroutine 之间的通信变得更加清晰和简单,代码的可读性和可维护性也会得到提高。因此,channel 是 Go 语言中非常重要的并发编程机制之一。
使用方式
-
必须使用
make
来创建,nil channel是不能使用的func main() { chanStr := make(chan string, 1) // 创建一个缓存区长度为1的channel chanStr <- "测试" println(<-chanStr) var chanStr2 chan string println(<-chanStr2) // chan receive (nil chan) }
-
chan中存放和读取数据
func main() { chanStr := make(chan string, 1) chanStr <- "测试" //存放数据 res := <-chanStr // 读取数据 println(res) }
-
用完关闭channel
调用close关闭掉channnel
func main() { c := make(chan string) close(c) }
ps:用完关闭channel是很重要的,channel的关闭原则是 数据的发送发来关闭
-
channel的几种数据读取方式
-
for range
func main() { c := gen(1, 2, 3) for i := range c { println(i) } } // gen函数用来生成一个chan,chan里面输出的是生成的数据流 func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { // channel中发送数据 out <- n } // 在发送完毕之后关闭chanel close(out) }() return out }
这种方式很推荐,channel关闭后,会自动退出for range 循环。
-
直接读取
func main() { c := gen(1, 2, 3) r1 := <-c r2 := <-c r3 := <-c println(r1) // 1 println(r2) // 2 println(r3) // 3 } func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
每次读取一个数据
但这样有个问题:channel被关闭之后,还可以继续读取数据,读取的是chan中类型的零值,int的零值就是0
func main() { c := gen(1, 2, 3) r1 := <-c r2 := <-c r3 := <-c println(r1) // 1 println(r2) // 2 println(r3) // 3 println(<-c) // 0 再次读取,读取的是零值 println(<-c) // 0 }
这种就有问题了,这种方式怎么获取channel是否被关闭的标示呢?就有了下面的方式。
这种方式并不是完全没有用处,可以达到广播的作用。
channel中的数据不是共享的,一份数据不同的goRoutine读取的到的是不一样的,一个数据,被goRoutine读取了就没有了,下一个goRoutine是不会读取到它的。但channel被关闭掉之后,所有的goRoutine读取的都一样了,都是零值
代码如下
func main() { c := make(chan int, 10) doneC := make(chan struct{}) for i := 0; i < 2; i++ { // 启动两个goRoutine go func() { for { i := rand.Int() select { case c<-i: case <-doneC: // done中有信号了,结束,返回 println("stop process") return } time.Sleep(1*time.Second) } }() } println(<-c) println(<-c) close(doneC) time.Sleep(time.Hour) } //output: 8674665223082153551 5577006791947779410 stop process stop process
-
直接读取,并且携带读取成功标志位
func main() { c := gen(1, 2, 3) r1 := <-c r2 := <-c r3 := <-c println(r1) // 1 println(r2) // 2 println(r3) // 3 r4,ok := <-c fmt.Printf("res:%d,ok:%v\n",r4,ok) // res:0,ok:false }
-
关闭的channel和未初始化的channel
关闭的channel
具体的分析可以看下面源码分析/读取数据部分
- channel中的数据已经读取完毕,此时,继续读取,读取的都是channel中指定类型的0值。
- channel中的数据没有读取完毕,此时,继续读取,读取的都是channel中的数据。
- 不能写入
- 调用
len
返回channel中元素的个数,cap
返回channel的容量
未初始化的channel
- 不能读取数据
- 调用
len
返回0,cap
返回0 close
报错
带有方向的channel
通过 <-
来指定channel的只读还是只写
package main
import "fmt"
func main() {
// 创建一个双向通道
ch1 := make(chan int)
// 创建一个只能发送数据的通道
ch2 := make(chan<- int)
// 创建一个只能接收数据的通道
ch3 := make(<-chan int)
// 打印通道的类型
fmt.Printf("ch1 type: %T\n", ch1)
fmt.Printf("ch2 type: %T\n", ch2)
fmt.Printf("ch3 type: %T\n", ch3)
}
//output:
ch1 type: chan int
ch2 type: chan<- int
ch3 type: <-chan int
在这个例子中,我们分别创建了一个双向通道 ch1
、一个只能发送数据的通道 ch2
和一个只能接收数据的通道 ch3
。通过使用 <-
操作符来指定通道的方向,我们可以将通道限制为只能发送或只能接收数据。在打印通道的类型时,可以看到它们的类型分别是 chan int
、chan<- int
和 <-chan int
,这是由通道的方向性所决定的。
ps: 通道的方向性只是在编译期起作用,对于运行时的数据传输没有任何影响。因此,即使将一个双向通道赋值给一个只能发送或只能接收数据的通道,也不会影响通道的数据传输。 只能是双向channel转单向channel,不能单项channel转双向channel
代码如下:
func main() {
ch1 := make(chan int)
ch2 := (chan<- int)(ch1)
ch3 := (<-chan int)(ch1)
fmt.Printf("%T\n",ch1)
fmt.Printf("%T\n",ch2)
fmt.Printf("%T\n",ch3)
}
//output:
chan int
chan<- int
<-chan int
func main() {
ch1 := make(chan int,1)
ch1<-1
go f1(ch1)
time.Sleep(time.Hour)
}
func f1(d <-chan int) { // 形参可以直接转换
println(<-d) //1
}
源码解析
底层操作
利用dlv的disass
来查看汇编代码,找出对应的底层函数
这里列举几个常用,剩下的可以自行看
操作 | 对应的底层函数 |
---|---|
make(chan int) | runtime.makechan |
channel<-1 (channel中写数据) | $runtime.chansend |
<-channel (channel中读数据) | $runtime.chanrecv |
close(channel) 关闭channel | $runtime.closechan |
下面呢,我们先给出channel的图,方便理解之后的操作
底层结构体
源码链接:https://github.com/golang/go/blob/release-branch.go1.19/src/runtime/chan.go#L33
channel的底层结构体如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct { // 等待队列
first *sudog
last *sudog
}
对应的图如下:
结合之前Java的同步队列和ReentrantLock 的实现原理,在加上channel的实际表现,可以总结如下:
channel底层维护了一个循环队列,队列中存放的是channel中需要同步的数据,并未维护了发送和接受索引,还有发送和接受的等待队列,我们在make的时候指定容量其实就是底层队列的长度。
发送数据的时候,依次是先看是否有正在等待的接收方,如果有直接copy数据(这样做可以省略掉再次往队列中copy数据),其次,再看缓存区是否可以存放(qcount和dataqsize的关系),通过snedx来指定存放的位置,再者,构建等待队列,当前发送数据的goroutine堵塞,构建发送方等待队列,等待接收方唤醒。
接受数据的操作也一样
创建channel
创建channel主要就是创建hchan
结构体,主要包括
- 分配内存
- 初始化锁和属性
func makechan(t *chantype, size int) *hchan {
// chann中元素的类型
elem := t.elem
// compiler checks this but be safe.
// chan中元素的大小是是否超出了限制
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 计算chan需要的内存大小,其实就是算里面内存的大小*长度
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// channel中的元素不包含指针的时候,hchan中也不包含指针
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
// 直接分配了一个hchan
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// 竞争检测器使用此位置进行同步
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 不包含指针
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 在hchannsize上增加了mem大小
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 找到buf,也就是底层的数组
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
//分配底层空间
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size) // 元素的size
c.elemtype = elem
c.dataqsiz = uint(size) // 队列的长度
lockInit(&c.lock, lockRankHchan) // 初始化锁
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
通过下面代码可以知道,channel是引用传递的
,因为返回的是指针,验证代码如下
func main() {
ch1 := make(chan int,1)
f1(ch1)
fmt.Printf("%p \n",ch1)
}
func f1(d <-chan int) {
fmt.Printf("%p\n",d)
}
//output:
0xc00001c0e0
0xc00001c0e0
写入数据
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
//
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 上锁
lock(&c.lock)
// 已经关闭,这里可以看到channel被关闭,是不能再发送数据的
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 等待队列中有等待,直接发送数据
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 走到这说明没有等待队列
if c.qcount < c.dataqsiz {
// 队列中还有空间
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 找到发送index,这返回的是一个Pointer
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep) // copy值
c.sendx++ //发送索引++
if c.sendx == c.dataqsiz {
// 归0
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg() // 这里要开始堵塞了 // 拿到goRoutine
mysg := acquireSudog() // 构建发送的等待队列
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 入队
atomic.Store8(&gp.parkingOnChan, 1)
// goRoutine就park在这里了
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep) // 保证此对象一直存活
// 当前goroutine被唤醒,唤醒操作不在这里,在chanrecv,并且在它里面有赋值操作,将缓冲区的数据copy到它里面
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success // 表示是否成功,用于下面的判断, 表示channel是否关闭,也就是说channel关闭,goroutine幻想,并且报错
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
// 发送操作
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 这是的竞争检测
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
// 这里是直接发送,就是将ep copy到sg中
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒
goready(gp, skip+1)
}
上面的代码我增加了注释,其中关于go park goroutine的操作我没有看懂,之后又机会会在分析一下,但这不影响看整体代码逻辑,我对上面代码的总结如下:
-
操作之前先上锁,操作完之后记得要释放锁
-
close的channel是不能发送数据的
-
接收方的等待队列中出队一个元素,如果有,直接将发送的值copy到这个元素中,并且唤醒它,函数结束
-
查看循环队列(缓冲区)中还有还有空间,将当前元素存放在这里通过sendX,函数返回
-
构建sudog,存放在发送方的等待队列中,当前goroutine park住。等待从channel中接收数据的时候唤醒它。
不带缓存区的channel和带缓存区的channel有啥区别?
一个有循环队列,一个没有,对于上面的逻辑来说,没有循环队列,就会直接park住当前的goroutine。
读取数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// block表示是否阻塞
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// channel没有初始化
if c == nil {
if !block { // 非阻塞
return
}
// 这里表示堵塞,当前goroutine park住
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁
lock(&c.lock)
// channel是否关闭
if c.closed != 0 { // channel已经关闭
//buffer中没有数据
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// 清空ep,这个时候ep返回的就是这个类型的0值
typedmemclr(c.elemtype, ep)
}
// 从这里开始,channel已经关闭,并且buffer中没有数据
// 所以,我们在使用channel的时候,比如 a,ok:= <-c ,ok并不是表示当前channel是否关闭,而是表示当前操作是否接收到数据
return true, false
}
// 走到这里,说明channel已经关闭,并且buffer中有数据
} else {
//channel没有关闭,
// 发送队列中出队
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 发送数据
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// buffer中有数据 走到这里有下面的几种可能性
// 1. channel没有关闭,并且没有发送方的等待队列
// 2. channel关闭,但buffer中有数据,并且没有发送的等待队列
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx) //找到接收位
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //移动数据,将数据从buffer中移动到ep中(qp->eq)
}
typedmemclr(c.elemtype, qp) // 会将buffer中的内存清楚掉
c.recvx++ // 索引++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount-- // 数量--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 到这里,需要park住当前goroutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 构建等待队列
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 当前goroutine park住
// 被唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg) // 释放sudog
return true, success
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// buffer长度为0
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// 直接将数据将数据copy到ep中
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 拿到读取的index
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
// 从buffer中copy到ep中
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queu
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 发送的x = 接收的x
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒等待的goroutine
goready(gp, skip+1)
}
上面代码解释如下:
简单来说,和上面发送是一样的流程
- 加锁,操作结束之后需要释放锁
- channel已经关闭并且channel中没有数据,清空内存,返回false
- 发送等待队列中是否有等待的groutine,如果有出队,并且从buffer(recvx)中捞一个数据,给等待的goRoutine,并且唤醒。
- 看buffer中是否还有数据,有的话,就通过recvx来获取一个元素,并且返回
- 构建sudog,如读取等待队列,park住当前goroutine,等待写入操作唤醒
需要注意:
通过上面的代码可以知道下面的几个问题
- 直接读取,并且携带读取成功标志位的这种读取方式,返回的bool表示的是
是否读取到值,也不是channel是否关闭
, - channel关闭后,如果channel的buffer中还有数据,还是可以读取到的,并且boo值为true,只有在channel中buffer没有数据,bool返回为false
// demo1
func main() {
// buffer长度为10
intC := make(chan int, 10)
// 发送10个数据
for i := 0; i < 10; i++ {
intC <- i
}
// 关闭channel
close(intC)
// 从channel中读取数据,到此时,channel已经关闭了,但还是可以继续读取出来channel中的数据
for i := range intC {
println(i)
}
}
// demo2
func main() {
// buffer长度为10
intC := make(chan int, 2)
// 发送10个数据
for i := 0; i < 2; i++ {
intC <- i
}
// 关闭channel
close(intC)
// 从channel中读取数据,到此时,channel已经关闭了,但还是可以继续读取出来channel中的数据
i,ok := <-intC // buffer中还有数据,所以ok为true
println(i,ok)
i1,ok1 := <-intC
println(i1,ok1)
i2,ok2 := <-intC // buffer中没有数据,所以ok为false,并且当前线程不会park
println(i2,ok2)
i3,ok3 := <-intC
println(i3,ok3)
}
//output:
0 true
1 true
0 false
0 false
关闭channel
func closechan(c *hchan) {
// nil的channel是不能关闭的
if c == nil {
panic(plainError("close of nil channel"))
}
// 加锁
lock(&c.lock)
if c.closed != 0 {
// 已经关闭过了,报错
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
// 关闭
c.closed = 1
var glist gList
// release all readers
// 释放所有的reader。释放接收方等待队列
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
// 清空元素,并且清空内存
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
// 表示当前goroutine并没有成功获取到数据,在读取数据的时候,goroutine park唤醒之后返回的就是success
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
// 唤醒等待的goroutine
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
对应解释如下
- 一堆判断
- 设置closed标志位
- 释放所有的发送等待方的队列和发送者的队列。
结合上面写入和读取数据的逻辑,总结如下
-
发送
释放所有的发送者的等待队列,报错。已关闭的channel不能发送数据
-
读取
释放所有的等待队列,不报错,返回0值
验证代码如下:
- 发送
func main() {
ints := make(chan int)
for i := 0; i < 10; i++ {
go func(index int) {
ints <- index
println(index,"done")
}(i)
}
time.Sleep(1* time.Second)
close(ints)
time.Sleep(time.Hour)
}
//output
panic: send on closed channel
goroutine 9 [running]:
main.main.func1(0x6)
/Users/lc/GolandProjects/awesomeProject4/main.go:9 +0x30
created by main.main
/Users/lc/GolandProjects/awesomeProject4/main.go:8 +0x40
-
接收
func main() { ints := make(chan int) for i := 0; i < 10; i++ { go func(index int) { i,ok := <-ints println(index,"done",i,ok) }(i) } time.Sleep(1* time.Second) close(ints) time.Sleep(time.Hour) } //output: 4 done 0 false 6 done 0 false 5 done 0 false 9 done 0 false 0 done 0 false 8 done 0 false 3 done 0 false 2 done 0 false 1 done 0 false 7 done 0 false
关于channel的分析就到这里了