go的通道channel是用于协程之间数据通信的一种方式
一、channel的结构
go源码:GitHub - golang/go: The Go programming language
src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue 队列中当前元素计数,满了就=dataqsiz
dataqsiz uint // size of the circular queue 环形队列大小(缓存大小)
buf unsafe.Pointer // points to an array of dataqsiz elements 指向任意类型的指针
elemsize uint16 //元素大小
closed uint32 //是否关闭,0-未关闭,1-已关闭
timer *timer // timer feeding this chan //定时器
elemtype *_type // element type //元素类型
sendx uint // send index //发送索引
recvx uint // receive index //结束索引
recvq waitq // list of recv waiters //接收等待队列(<-ch)
sendq waitq // list of send waiters //发送等待队列(ch<-)
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //锁,保护hchan中的所有字段
}
type waitq struct { //等待队列,sudog双向链表结构
first *sudog //(伪g)表示等待列表中的g,例如在一个通道上用于发送/接收的g
last *sudog //用acquireSudog分配,releaseSudog释放
从结构体上可以记住channel的一些特点,比如说
(1)lock 锁:操作channel是互斥的。先获取锁,操作channel,释放锁
(2)elemtype类型:创建的时候必须指定类型(大小可指定) ch := make(chan int,10)
(3)waitq队列:FIFO先进先出队列,即通道,能通过任意类型(unsafe.Pointer)的数据,(sudog)双向链表的g
(4)dataqsiz通道容量:有值=有缓冲通道,没值=无缓冲通道
(5)qcount通道元素计数:当前通道内的元素个数总数
(6)接受和发送:通信,有人发还要有人收,意味必须2个g及以上的成员一起工作
(7)timer定时器:定时可对channel做特殊操作
(8)closed关闭:写(发送)已关闭通道会panic,读(接收)已关闭通道立刻返回:true,false
二、channel创建
创建:make(chan类型 元素类型,缓冲容量大小),var chan类型 元素类型
func Test_2(t *testing.T) {
ch1 := make(chan int) //双向
ch11 := make(chan int, 10) //双向,带缓冲容量10
ch2 := make(chan<- int) //只写
ch22 := make(chan int, 10) //只写,带缓冲容量10
ch3 := make(<-chan float64) //只读
ch33 := make(chan int, 10) //只读,带缓冲容量10
//go1.17_spec.html
//chan T // can be used to send and receive values of type T
//chan<- float64 // can only be used to send float64s
//<-chan int // can only be used to receive ints
var ch4 chan int
//通道是引用类型,通道类型的空值是nil。
g.Dump(ch1, ch11, ch2, ch22, ch3, ch33, ch4)
//打印
//<chan int>
//<chan int>
//<chan<- int>
//<chan int>
//<<-chan float64>
//<chan int>
//<chan int>
}
(1)channel通道类型:
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" )=(双向|单向发送(写)|单向接收(读))
只写:操作读会报错。
使用场景,上下文:src/context/context.go的Context,
type Context interface {
...
// a Done channel for cancellation.
Done() <-chan struct{}
...
}
只读:操作写会报错
使用场景,上下文:src/os/signal/signal.go的handlers,
var handlers struct {
sync.Mutex
// Map a channel to the signals that should be sent to it.
m map[chan<- os.Signal]*handler
...
}
type stopping struct {
c chan<- os.Signal
h *handler
}
(2)channel数据类型:
任意,如:int,float64,bool,map...
(3)缓冲容量
第二个参数给定数量,如10
无缓冲通道:ch := make(chan int);
有缓冲通道:ch := make(chan int,10)
三、向channel写数据
channel的发送要注意区分【有缓冲容量】和【无缓冲容量】
1.有人接收:正常发送
func Test_send1(t *testing.T) {
ch := make(chan int) //双向
//开启goroutine将1~5的数发送到ch中
go func() {
for i := 1; i <= 5; i++ {
fmt.Println("写入ch 元素:", i)
ch <- i
}
close(ch) //写完,关闭通道
}()
//在主goroutine中从ch中接收值打印
for i := range ch {
fmt.Println("读取ch 结果:", i)
}
//写入ch 元素: 1
//写入ch 元素: 2
//读取ch 结果: 1
//读取ch 结果: 2
//写入ch 元素: 3
//写入ch 元素: 4
//读取ch 结果: 3
//读取ch 结果: 4
//写入ch 元素: 5
//读取ch 结果: 5
//主goroutine和goroutine读写互斥,相互竞争锁。直到通道关闭主程结束
}
2.没人接收:
看通道容量情况:无缓存通道,容量0,一个都发不出,直接阻塞
注意:select是非阻塞发送,会直接返回
func Test_send2(t *testing.T) {
ch := make(chan int) //双向
//开启goroutine将1~5的数发送到ch中
go func() {
for i := 1; i <= 5; i++ {
fmt.Println("写入ch 元素:", i)
ch <- i
}
close(ch) //写完,关闭通道
}()
//写入ch 元素: 1
//无缓存通道:没人接收,尝试发送1时,g被阻塞
}
思考为什么读要先于发?尝试从源码角度分析,因为无缓存channel的接收方会从发送方栈拷贝数据后,发送方才会被放回调度队列种,等待重新调度,如果一直没有读,发就一直卡住,无法被唤醒
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
...
}
//1.c通道
//2.发送方sg发送的值被放入通道中,发送方被唤醒,继续它的快乐之路
//3.接收方接收到的值(当前G)为写入ep
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//无缓冲读
if c.dataqsiz == 0 {
...
if ep != nil {
// copy data from sender 接收是直接从发送的栈进行拷贝
recvDirect(c.elemtype, sg, ep)
}
} else {
//有缓冲读
// 从缓存队列拷贝
qp := chanbuf(c, c.recvx)
...
}
gp.param = unsafe.Pointer(sg)
...
//唤醒g准备执行
goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
// The channel is locked, so src will not move during this
// operation.
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
//从"from"拷贝n个字节到"to",from是src发送方,to就是dst接受方
memmove(dst, src, t.Size_)
}
func memmove(to, from unsafe.Pointer, n uintptr)
有缓存通道,容量5,能先发5个,第6个阻塞
func Test_send4(t *testing.T) {
ch := make(chan int, 5) //双向
//开启goroutine将1~5的数发送到ch中
go func() {
for i := 1; i <= 10; i++ {
fmt.Println("写入ch 元素:", i)
ch <- i
}
close(ch) //写完,关闭通道
}()
//写入ch 元素: 1
//写入ch 元素: 2
//写入ch 元素: 3
//写入ch 元素: 4
//写入ch 元素: 5
//写入ch 元素: 6
//无缓存通道:没人接收,前5个元素成功发送,尝试发送6时,g才被阻塞
}
3.发送到已关闭的通道,会panic
func Test_send3(t *testing.T) {
ch := make(chan int) //双向
close(ch) //关闭通道
ch <- 1
//在主goroutine中从ch中接收值打印
for i := range ch {
fmt.Println("读取ch 结果:", i)
}
//panic: send on closed channel
}
发送小结:
(1)如果channel为nil,如果非阻塞式发送(select send),直接返回false,否则阻塞
(2)如果channel已关闭,直接panic
(3)如果recevq等待队列有接收方,直接拷贝数据给接收方,并唤醒接收方的g
(4)如果channel缓冲区未满,发到缓冲区,否则阻塞,”保护发送现场“等待被唤醒
四、向channel读数据
1.有人发送:通道有值,遍历通道,如果通道未关闭,读完元素后,会报死锁的错误
func Test_read1(t *testing.T) {
ch := make(chan int, 5) //双向
for i := 1; i <= 5; i++ {
fmt.Println("写入ch 元素:", i)
ch <- i
}
close(ch)
for i := 1; i < 9; i++ {
v, ok := <-ch
fmt.Println(v, ok)
}
//写入ch 元素: 1
//写入ch 元素: 2
//写入ch 元素: 3
//写入ch 元素: 4
//写入ch 元素: 5
//1 true
//2 true
//3 true
//4 true
//5 true
//fatal error: all goroutines are asleep - deadlock!
}
2.没人发送:
(1)make或var初始化之后就读取,不管有无容量,都会一直阻塞,等待写入本次实验go版本(go1.20.5 )
注意:如果channel有值,并且一直没关闭,一直for循环读,读完之后会报死锁的错误。
func Test_read2(t *testing.T) {
ch := make(chan int) //双向
for i := 1; i < 9; i++ {
v, ok := <-ch
fmt.Println(v, ok)
}
}
func Test_read3(t *testing.T) {
ch := make(chan int, 5) //双向
for i := 1; i < 9; i++ {
v, ok := <-ch
fmt.Println(v, ok)
}
}
func Test_read4(t *testing.T) {
var ch chan int
for i := 1; i < 9; i++ {
v, ok := <-ch
fmt.Println(v, ok)
}
}
3.读取已关闭的通道,会不会panic?
答案是:不会panic,不影响
已关闭通道,如果有值,返回:值,true
已关闭通道,如果没值,返回:0,false
func Test_read5(t *testing.T) {
ch := make(chan int) //双向
close(ch)
for i := range ch {
fmt.Println("读取ch 结果:", i)
}
//0 false
//0 false
//0 false
//0 false
//0 false
//0 false
//0 false
//0 false
}
func Test_read6(t *testing.T) {
ch := make(chan int, 5) //双向
for i := 1; i <= 5; i++ {
fmt.Println("写入ch 元素:", i)
ch <- i
}
close(ch)
for i := 1; i < 9; i++ {
v, ok := <-ch
fmt.Println(v, ok)
}
//写入ch 元素: 1
//写入ch 元素: 2
//写入ch 元素: 3
//写入ch 元素: 4
//写入ch 元素: 5
//1 true
//2 true
//3 true
//4 true
//5 true
//0 false
//0 false
//0 false
}
4.for Range
for ... range阻塞式读取channel的值,直到channel被关闭
func Test_read(t *testing.T) {
ch := make(chan int) //双向
//开启goroutine将1~5的数发送到ch中
go func() {
for i := 1; i <= 5; i++ {
fmt.Println("写入ch 元素:", i)
ch <- i
}
close(ch) //写完,关闭通道
}()
//在主goroutine中从ch中接收值打印
for i := range ch {
fmt.Println("读取ch 结果:", i)
}
//写入ch 元素: 1
//写入ch 元素: 2
//读取ch 结果: 1
//读取ch 结果: 2
//写入ch 元素: 3
//写入ch 元素: 4
//读取ch 结果: 3
//读取ch 结果: 4
//写入ch 元素: 5
//读取ch 结果: 5
//主goroutine和goroutine读写互斥,相互竞争锁。直到通道关闭主程结束
}
若close(ch)注释掉,读完数据之后,还继续读,会报死锁的错误。
func Test_read(t *testing.T) {
ch := make(chan int) //双向
//开启goroutine将1~5的数发送到ch中
go func() {
for i := 1; i <= 5; i++ {
fmt.Println("写入ch 元素:", i)
ch <- i
}
//close(ch) //写完,关闭通道
}()
//在主goroutine中从ch中接收值打印
for i := range ch {
fmt.Println("读取ch 结果:", i)
}
//写入ch 元素: 1
//写入ch 元素: 2
//读取ch 结果: 1
//读取ch 结果: 2
//写入ch 元素: 3
//写入ch 元素: 4
//读取ch 结果: 3
//读取ch 结果: 4
//写入ch 元素: 5
//读取ch 结果: 5
//fatal error: all goroutines are asleep - deadlock!
}
读取小结:
(1)如果channel为nil,如果非阻塞式接收(select receive),直接返回(false,false),否则阻塞
(2)如果channel计时器不为nil,检查计时,做超时处理
(3)如果channel上有可以接收的数据(empty函数),且是阻塞读(block=false),在channel未关闭时,返回(false,false),如果未关闭,再次检查empty函数,没可接收数据,返回(true,false)
(4)当channel已关闭,如果是无缓冲,返回(0,false),如果有缓存,执行recv()方法
从sendq队列或缓冲区中拷贝数据
(5)当channel未关闭,有缓冲,读缓冲数据(如果非阻塞式接收(select receive),直接返回(false,false))
(6)当缓冲数据读完了,”保护接收现场“,等待被唤醒
五、channel死锁问题
1.同一个goroutine上执行
(1).未初始化的channel,读死锁,写死锁
func Test_deadlock1(t *testing.T) {
// 未初始化的channel,直接写死锁
var ch chan int
ch <- 1
}
func Test_deadlock2(t *testing.T) {
// 未初始化的channel,直接写死锁
var ch chan int
<-ch
}
(2).已初始化的channel
(2.1)无缓冲,直接读死锁,写死锁
func Test_deadlock3(t *testing.T) {
// 初始化无缓冲的channel,直接写死锁
ch := make(chan int)
ch <- 1
}
func Test_deadlock4(t *testing.T) {
// 初始化无缓冲的channel,直接写死锁
ch := make(chan int)
val, ok := <-ch
fmt.Println(val, ok)
}
(2.2)有缓冲
先写后读(读完之后,死锁)
func Test_deadlock5(t *testing.T) {
// 未初始化的channel,直接写死锁
ch := make(chan int, 5)
ch <- 1
ch <- 2
ch <- 3
for v := range ch {
fmt.Println(v)
}
//1
//2
//3
//fatal error: all goroutines are asleep - deadlock!
}
先读后写(死锁)
func Test_deadlock6(t *testing.T) {
// 未初始化的channel,直接写死锁
ch := make(chan int, 5)
for v := range ch {
fmt.Println(v)
}
ch <- 1
ch <- 2
ch <- 3
//fatal error: all goroutines are asleep - deadlock!
}
所以,通信只是发生在1端,容易造成死锁
2.不同的goroutine
五、关闭通道
关闭已关闭的通道,会panic
六、
六、总结
1.通道创建类型有3种:双向,只读,只写
2.通道有容量可设:无缓冲通道和有缓冲通道
3.通道读写互斥
4.已关闭通道,写panic,读有值,再关闭panic
5.通道通常需要2个g一起工作