GO channel解析

news2024/11/18 1:35:44

GO channel解析

是什么?

官方文档:

  1. https://go.dev/ref/spec#Channel_types
  2. https://go.dev/blog/pipelines(channel提供了流式编程的例子)

在 Go 语言中,channel 是一种用于在 goroutine 之间进行通信和同步的机制。可以用于在不同的 goroutine 之间传递数据。channel 可以是带缓存的或无缓存的,带缓存的 channel 可以缓存一定数量的元素,而在无缓存的 channel 中,发送和接收操作是同步的,这意味着发送操作会被阻塞直到有其他 goroutine 准备好接收数据,而接收操作也会被阻塞直到有其他 goroutine 准备好发送数据。channel 的使用可以有效地避免竞态条件和死锁等并发编程中的问题。

channel 满足并发编程中的通信和同步操作。通过 channel,不同的 goroutine 可以在不使用互斥锁等同步机制的情况下进行数据传递和共享,从而避免了竞态条件和死锁等并发编程中的问题。同时,channel 的使用可以使 goroutine 之间的通信变得更加清晰和简单,代码的可读性和可维护性也会得到提高。因此,channel 是 Go 语言中非常重要的并发编程机制之一。

使用方式

  1. 必须使用make来创建,nil channel是不能使用的

    func main() {
    	chanStr := make(chan string, 1) // 创建一个缓存区长度为1的channel
    	chanStr <- "测试"
    	println(<-chanStr)
    
    	var chanStr2 chan string
    	println(<-chanStr2) // chan receive (nil chan)
    
    }
    
  2. chan中存放和读取数据

    func main() {
    	chanStr := make(chan string, 1)
    	chanStr <- "测试" //存放数据
    	res := <-chanStr // 读取数据
    	println(res)
    }
    
  3. 用完关闭channel

    调用close关闭掉channnel

    func main() {
    	c := make(chan string)
    	close(c)
    }
    

    ps:用完关闭channel是很重要的,channel的关闭原则是 数据的发送发来关闭

  4. channel的几种数据读取方式

    • for range

      func main() {
      	c := gen(1, 2, 3)
      	for i := range c {
      		println(i)
      	}
      }
      // gen函数用来生成一个chan,chan里面输出的是生成的数据流
      func gen(nums ...int) <-chan int {
      	out := make(chan int)
      	go func() {
      		for _, n := range nums {
                  // channel中发送数据
      			out <- n
      		}
              // 在发送完毕之后关闭chanel
      		close(out)
      	}()
      	return out
      }
      

      这种方式很推荐,channel关闭后,会自动退出for range 循环。

    • 直接读取

      func main() {
      	c := gen(1, 2, 3)
      	r1 := <-c
      	r2 := <-c
      	r3 := <-c
      	println(r1) // 1
      	println(r2) // 2
      	println(r3) // 3
      }
      func gen(nums ...int) <-chan int {
      	out := make(chan int)
      	go func() {
      		for _, n := range nums {
      			out <- n
      		}
      		close(out)
      	}()
      	return out
      }
      

      每次读取一个数据

      但这样有个问题:channel被关闭之后,还可以继续读取数据,读取的是chan中类型的零值,int的零值就是0

      func main() {
      	c := gen(1, 2, 3)
      	r1 := <-c
      	r2 := <-c
      	r3 := <-c
      	println(r1) // 1
      	println(r2) // 2
      	println(r3) // 3
      	println(<-c) // 0  再次读取,读取的是零值
      	println(<-c) // 0
      }
      

      这种就有问题了,这种方式怎么获取channel是否被关闭的标示呢?就有了下面的方式。

      这种方式并不是完全没有用处,可以达到广播的作用。

      channel中的数据不是共享的,一份数据不同的goRoutine读取的到的是不一样的,一个数据,被goRoutine读取了就没有了,下一个goRoutine是不会读取到它的。但channel被关闭掉之后,所有的goRoutine读取的都一样了,都是零值

      代码如下

      func main() {
      	c := make(chan int, 10)
      	doneC := make(chan struct{})
      
      	for i := 0; i < 2; i++ { // 启动两个goRoutine
      		go func() {
      			for {
      				i := rand.Int()
      				select {
      				case c<-i:
      				case <-doneC: // done中有信号了,结束,返回
      					println("stop process")
      					return
      				}
      				time.Sleep(1*time.Second)
      			}
      		}()
      	}
      
      	println(<-c)
      	println(<-c)
      	close(doneC)
      	time.Sleep(time.Hour)
      }
      //output:
      8674665223082153551
      5577006791947779410
      stop process
      stop process
      
    • 直接读取,并且携带读取成功标志位

      func main() {
      	c := gen(1, 2, 3)
      	r1 := <-c
      	r2 := <-c
      	r3 := <-c
      	println(r1) // 1
      	println(r2) // 2
      	println(r3) // 3
      
      	r4,ok := <-c
      	fmt.Printf("res:%d,ok:%v\n",r4,ok) //  res:0,ok:false
      }
      

关闭的channel和未初始化的channel

关闭的channel

具体的分析可以看下面源码分析/读取数据部分

  1. channel中的数据已经读取完毕,此时,继续读取,读取的都是channel中指定类型的0值。
  2. channel中的数据没有读取完毕,此时,继续读取,读取的都是channel中的数据。
  3. 不能写入
  4. 调用len返回channel中元素的个数,cap返回channel的容量

未初始化的channel

  1. 不能读取数据
  2. 调用len返回0,cap返回0
  3. close报错

带有方向的channel

通过 <-来指定channel的只读还是只写

package main

import "fmt"

func main() {
    // 创建一个双向通道
    ch1 := make(chan int)

    // 创建一个只能发送数据的通道
    ch2 := make(chan<- int)

    // 创建一个只能接收数据的通道
    ch3 := make(<-chan int)

    // 打印通道的类型
    fmt.Printf("ch1 type: %T\n", ch1)
    fmt.Printf("ch2 type: %T\n", ch2)
    fmt.Printf("ch3 type: %T\n", ch3)
}
//output:
ch1 type: chan int
ch2 type: chan<- int
ch3 type: <-chan int

在这个例子中,我们分别创建了一个双向通道 ch1、一个只能发送数据的通道 ch2 和一个只能接收数据的通道 ch3。通过使用 <- 操作符来指定通道的方向,我们可以将通道限制为只能发送或只能接收数据。在打印通道的类型时,可以看到它们的类型分别是 chan intchan<- int<-chan int,这是由通道的方向性所决定的。

ps: 通道的方向性只是在编译期起作用,对于运行时的数据传输没有任何影响。因此,即使将一个双向通道赋值给一个只能发送或只能接收数据的通道,也不会影响通道的数据传输。 只能是双向channel转单向channel,不能单项channel转双向channel代码如下:

func main() {
	ch1 := make(chan int)
	ch2 := (chan<- int)(ch1)
	ch3 := (<-chan int)(ch1)
	fmt.Printf("%T\n",ch1)
	fmt.Printf("%T\n",ch2)
	fmt.Printf("%T\n",ch3)
}
//output:
chan int
chan<- int
<-chan int
func main() {
	ch1 := make(chan int,1)
	ch1<-1
	go f1(ch1)
	time.Sleep(time.Hour)
}

func f1(d <-chan int)  { // 形参可以直接转换
	println(<-d) //1
}

源码解析

底层操作

利用dlv的disass来查看汇编代码,找出对应的底层函数
这里列举几个常用,剩下的可以自行看

操作对应的底层函数
make(chan int)runtime.makechan
channel<-1 (channel中写数据)$runtime.chansend
<-channel (channel中读数据)$runtime.chanrecv
close(channel) 关闭channel$runtime.closechan

下面呢,我们先给出channel的图,方便理解之后的操作

底层结构体

源码链接:https://github.com/golang/go/blob/release-branch.go1.19/src/runtime/chan.go#L33

channel的底层结构体如下:

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

type waitq struct { // 等待队列 
	first *sudog
	last  *sudog
}

对应的图如下:
在这里插入图片描述

结合之前Java的同步队列和ReentrantLock 的实现原理,在加上channel的实际表现,可以总结如下:

channel底层维护了一个循环队列,队列中存放的是channel中需要同步的数据,并未维护了发送和接受索引,还有发送和接受的等待队列,我们在make的时候指定容量其实就是底层队列的长度。

发送数据的时候,依次是先看是否有正在等待的接收方,如果有直接copy数据(这样做可以省略掉再次往队列中copy数据),其次,再看缓存区是否可以存放(qcount和dataqsize的关系),通过snedx来指定存放的位置,再者,构建等待队列,当前发送数据的goroutine堵塞,构建发送方等待队列,等待接收方唤醒。

接受数据的操作也一样

创建channel

创建channel主要就是创建hchan结构体,主要包括

  • 分配内存
  • 初始化锁和属性
func makechan(t *chantype, size int) *hchan {
	// chann中元素的类型
	elem := t.elem

	// compiler checks this but be safe.
	// chan中元素的大小是是否超出了限制
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	// 计算chan需要的内存大小,其实就是算里面内存的大小*长度
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// channel中的元素不包含指针的时候,hchan中也不包含指针
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		// 直接分配了一个hchan
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		// 竞争检测器使用此位置进行同步
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 不包含指针
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		// 在hchannsize上增加了mem大小
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		// 找到buf,也就是底层的数组
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		//分配底层空间
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size) // 元素的size
	c.elemtype = elem
	c.dataqsiz = uint(size) // 队列的长度
	lockInit(&c.lock, lockRankHchan) // 初始化锁

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

通过下面代码可以知道,channel是引用传递的,因为返回的是指针,验证代码如下

func main() {
	ch1 := make(chan int,1)
	f1(ch1)
	fmt.Printf("%p \n",ch1)
}

func f1(d <-chan int)  {
	fmt.Printf("%p\n",d)

}
//output:
0xc00001c0e0
0xc00001c0e0

写入数据

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
	}
   // 
	if !block && c.closed == 0 && full(c) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	// 上锁
	lock(&c.lock)
	// 已经关闭,这里可以看到channel被关闭,是不能再发送数据的
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
    // 等待队列中有等待,直接发送数据
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
	// 走到这说明没有等待队列
	if c.qcount < c.dataqsiz {
		// 队列中还有空间
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx) // 找到发送index,这返回的是一个Pointer
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep) // copy值
		c.sendx++ //发送索引++
		if c.sendx == c.dataqsiz {
			// 归0
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}

	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg() // 这里要开始堵塞了 // 拿到goRoutine
	mysg := acquireSudog() // 构建发送的等待队列
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg) // 入队
	
	atomic.Store8(&gp.parkingOnChan, 1)
	// goRoutine就park在这里了
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

	KeepAlive(ep) // 保证此对象一直存活

	// 当前goroutine被唤醒,唤醒操作不在这里,在chanrecv,并且在它里面有赋值操作,将缓冲区的数据copy到它里面
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success // 表示是否成功,用于下面的判断, 表示channel是否关闭,也就是说channel关闭,goroutine幻想,并且报错
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
// 发送操作
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  // 这是的竞争检测
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	if sg.elem != nil {
    // 这里是直接发送,就是将ep copy到sg中
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
  // 唤醒
	goready(gp, skip+1)
}

上面的代码我增加了注释,其中关于go park goroutine的操作我没有看懂,之后又机会会在分析一下,但这不影响看整体代码逻辑,我对上面代码的总结如下:

  • 操作之前先上锁,操作完之后记得要释放锁

  • close的channel是不能发送数据的

  • 接收方的等待队列中出队一个元素,如果有,直接将发送的值copy到这个元素中,并且唤醒它,函数结束

  • 查看循环队列(缓冲区)中还有还有空间,将当前元素存放在这里通过sendX,函数返回

  • 构建sudog,存放在发送方的等待队列中,当前goroutine park住。等待从channel中接收数据的时候唤醒它。

不带缓存区的channel和带缓存区的channel有啥区别?

一个有循环队列,一个没有,对于上面的逻辑来说,没有循环队列,就会直接park住当前的goroutine。

读取数据

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// block表示是否阻塞
	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
	// channel没有初始化
	if c == nil {
		if !block { // 非阻塞
			return
		}
		// 这里表示堵塞,当前goroutine park住
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}


	if !block && empty(c) {
		if atomic.Load(&c.closed) == 0 {
			return
		}
		if empty(c) {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	// 加锁
	lock(&c.lock)
	// channel是否关闭
	if c.closed != 0 { // channel已经关闭
		//buffer中没有数据
		if c.qcount == 0 {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			unlock(&c.lock)
			if ep != nil {
        // 清空ep,这个时候ep返回的就是这个类型的0值
				typedmemclr(c.elemtype, ep) 
			}
			// 从这里开始,channel已经关闭,并且buffer中没有数据
      // 所以,我们在使用channel的时候,比如 a,ok:= <-c ,ok并不是表示当前channel是否关闭,而是表示当前操作是否接收到数据
			return true, false  
		}
		// 走到这里,说明channel已经关闭,并且buffer中有数据
	} else {
		//channel没有关闭,
		// 发送队列中出队
		if sg := c.sendq.dequeue(); sg != nil {
			// Found a waiting sender. If buffer is size 0, receive value
			// directly from sender. Otherwise, receive from head of queue
			// and add sender's value to the tail of the queue (both map to
			// the same buffer slot because the queue is full).
			// 发送数据
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}
  
	// buffer中有数据  走到这里有下面的几种可能性
  // 1. channel没有关闭,并且没有发送方的等待队列
  // 2. channel关闭,但buffer中有数据,并且没有发送的等待队列
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx) //找到接收位
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp) //移动数据,将数据从buffer中移动到ep中(qp->eq)
		}
		typedmemclr(c.elemtype, qp) // 会将buffer中的内存清楚掉
		c.recvx++ // 索引++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount-- // 数量--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 到这里,需要park住当前goroutine
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg) // 构建等待队列
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 当前goroutine park住

  // 被唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg) // 释放sudog
	return true, success
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// buffer长度为0
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		if ep != nil {
			// 直接将数据将数据copy到ep中
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
    // 拿到读取的index
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
		}
		// copy data from queue to receiver
		if ep != nil {
      // 从buffer中copy到ep中
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queu
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
    // 发送的x = 接收的x
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	// 唤醒等待的goroutine
	goready(gp, skip+1)
}

上面代码解释如下:

简单来说,和上面发送是一样的流程

  • 加锁,操作结束之后需要释放锁
  • channel已经关闭并且channel中没有数据,清空内存,返回false
  • 发送等待队列中是否有等待的groutine,如果有出队,并且从buffer(recvx)中捞一个数据,给等待的goRoutine,并且唤醒。
  • 看buffer中是否还有数据,有的话,就通过recvx来获取一个元素,并且返回
  • 构建sudog,如读取等待队列,park住当前goroutine,等待写入操作唤醒

需要注意:

通过上面的代码可以知道下面的几个问题

  1. 直接读取,并且携带读取成功标志位的这种读取方式,返回的bool表示的是是否读取到值,也不是channel是否关闭
  2. channel关闭后,如果channel的buffer中还有数据,还是可以读取到的,并且boo值为true,只有在channel中buffer没有数据,bool返回为false
// demo1
func main() {
	// buffer长度为10
	intC := make(chan int, 10)
	// 发送10个数据
	for i := 0; i < 10; i++ {
		intC <- i
	}
	// 关闭channel
	close(intC)
	// 从channel中读取数据,到此时,channel已经关闭了,但还是可以继续读取出来channel中的数据
	for i := range intC {
		println(i)
	}
}

// demo2
func main() {
	// buffer长度为10
	intC := make(chan int, 2)
	// 发送10个数据
	for i := 0; i < 2; i++ {
		intC <- i
	}
	// 关闭channel
	close(intC)
	// 从channel中读取数据,到此时,channel已经关闭了,但还是可以继续读取出来channel中的数据
	i,ok := <-intC   // buffer中还有数据,所以ok为true
	println(i,ok)

	i1,ok1 := <-intC
	println(i1,ok1)

	i2,ok2 := <-intC // buffer中没有数据,所以ok为false,并且当前线程不会park
	println(i2,ok2)

	i3,ok3 := <-intC
	println(i3,ok3)
}
//output:
0 true
1 true
0 false
0 false

关闭channel

func closechan(c *hchan) {
	// nil的channel是不能关闭的
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	// 加锁
	lock(&c.lock)
	if c.closed != 0 {
		// 已经关闭过了,报错
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}
	// 关闭
	c.closed = 1

	var glist gList

	// release all readers
	// 释放所有的reader。释放接收方等待队列
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			// 清空元素,并且清空内存
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
    // 表示当前goroutine并没有成功获取到数据,在读取数据的时候,goroutine park唤醒之后返回的就是success
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		// 唤醒等待的goroutine
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

对应解释如下

  • 一堆判断
  • 设置closed标志位
  • 释放所有的发送等待方的队列和发送者的队列。

结合上面写入和读取数据的逻辑,总结如下

  1. 发送

    释放所有的发送者的等待队列,报错。已关闭的channel不能发送数据

  2. 读取

    释放所有的等待队列,不报错,返回0值

验证代码如下:

  1. 发送
func main() {
	ints := make(chan int)
	for i := 0; i < 10; i++ {
		go func(index int) {
			ints <- index
			println(index,"done")
		}(i)
	}
	time.Sleep(1* time.Second)
	close(ints)
	time.Sleep(time.Hour)
}
//output
panic: send on closed channel

goroutine 9 [running]:
main.main.func1(0x6)
        /Users/lc/GolandProjects/awesomeProject4/main.go:9 +0x30
created by main.main
        /Users/lc/GolandProjects/awesomeProject4/main.go:8 +0x40

在这里插入图片描述

  1. 接收

    func main() {
    	ints := make(chan int)
    	for i := 0; i < 10; i++ {
    		go func(index int) {
    			i,ok := <-ints
    			println(index,"done",i,ok)
    		}(i)
    	}
    	time.Sleep(1* time.Second)
    	close(ints)
    	time.Sleep(time.Hour)
    }
    //output:
    4 done 0 false
    6 done 0 false
    5 done 0 false
    9 done 0 false
    0 done 0 false
    8 done 0 false
    3 done 0 false
    2 done 0 false
    1 done 0 false
    7 done 0 false
    

在这里插入图片描述

关于channel的分析就到这里了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/687447.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【FFmpeg实战】音频解码与编码流程

解码流程 音频编解码流程与视频编解码流程一致&#xff0c;我们可以对 mp4 文件的音频流进行解码&#xff0c;并将解码后的音频数据保存到 PCM 文件中&#xff0c;后续我们可以通过读取 PCM 文件中的数据实现音频流的编码操作 FFmpeg音频解码流程 extern"C" { #inc…

ICC2: Create Placement Blockage

area-based的placement blockage有四种,hard、hard macro、soft,partial。hard 属性限制所有standard cell、hard macro放进hard blockage中;hard macro仅限制hard macro(如sram);soft属性限制placement的init_place阶段(也叫coarse placement)把standard cell和hard macro…

Vuex学习

5.1.理解 Vuex 5.1.1.Vuex 是什么 概念&#xff1a;专门在Vue中实现集中式状态&#xff08;数据&#xff09;管理的一个Vue插件&#xff0c;对Vue应用中多个组件的共享状态进行集中式的管理&#xff08;读/写&#xff09;&#xff0c;也是一种组件间通信的方式&#xff0c;且适…

深度学习05-CNN循环神经网络

概述 循环神经网络&#xff08;Recurrent Neural Network&#xff0c;RNN&#xff09;是一种具有循环连接的神经网络结构&#xff0c;被广泛应用于自然语言处理、语音识别、时序数据分析等任务中。相较于传统神经网络&#xff0c;RNN的主要特点在于它可以处理序列数据&#xf…

超全汇总,性能测试常用指标大全(重要)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 两种性能指标 业…

Java又双叒叕“凉”了?

前几天&#xff0c;TIOBE的一份6月编程语言榜单公布&#xff1a;Java退出前三&#xff0c;位居第四。一波Java凉了的言论甚嚣尘上。其实不止Java&#xff0c;python、C、C&#xff0c;哪一个没被提过“凉”... 而现实是&#xff0c;Java的招聘需求依然很大&#xff1a; 不可否…

C++静态和动态链接库导出和使用

1、简介 代码开发过程中会遇到很多已有的函数库&#xff0c;这些函数库是现有的&#xff0c;成熟的&#xff0c;可以复用的代码。现实中每个程序都要依赖很多基础的底层库&#xff0c;不可能每个人的代码都从零开始&#xff0c;因此库的存在意义非同寻常。 本质上来说库是一种…

便携式水质自动采样器助力毒情监测

便携式水质自动采样器可助力毒情监测&#xff1a; 污水涉毒采样检测工作是运用科技手段准确评估监测辖区内毒情形势的重要手段。期间&#xff0c;民警详细了解了生活和工业污水的处理、排放以及服务范围、人口数量等情况&#xff0c;并就污水涉毒采样检测工作达成共识。随后&am…

revit中用幕墙来绘制瓦片屋面和生成土钉墙

一、revit中用幕墙来绘制瓦片屋面 层层叠叠的瓦片在我们绘制时具有复杂性&#xff0c;瓦片既美观又满足一些建筑的需要&#xff0c;下面教大家一个用幕墙来绘制瓦片屋面。 新建一个族样板选择公制轮廓—竖挺&#xff0c;绘制我们的瓦片形状 简单的绘制一个瓦片的形状&#xff0…

JVM学习整理(一)

一、JVM的基本介绍 JVM 是 Java Virtual Machine 的缩写&#xff0c;它是一个虚构出来的计算机&#xff0c;一种规范。通过在实际的计算机上仿真模拟各类计算机功能实现 好&#xff0c;其实抛开这么专业的句子不说&#xff0c;就知道JVM其实就类似于一台小电脑运行在windows或…

ruoyi-vue前后端分离项目实现一体化打包(前后端合并打包)

场景 现在要对ruoyi-vue前后端分离项目&#xff0c;进行一体化打包&#xff0c;即 将前后端项目打在一个jar里面 一体化打包优点 不需要再使用nginx&#xff0c;直接将前端文件放到后端项目里面 改造ruoyi-vue项目 后端改造 1、引入依赖spring-boot-starter-thymeleaf &…

倒计时 1 天 | SphereEx 在 2023 亚马逊云科技中国峰会等你来打卡!

2023 年 6 月 27 - 28 日&#xff0c; “因构建而可见” 2023 亚马逊云科技中国峰会将在上海隆重举行&#xff0c;SphereEx 将携面向新一代数据架构的数据库增强引擎&#xff1a;SphereEx-DBPlusEngine 亮相亚马逊云科技中国峰会&#xff0c;展示分布式数据库、数据安全、信创替…

FFmpeg视频转码关键参数详解

1 固定码率因子crf&#xff08;Constant Rate Factor&#xff09; 固定码率因子&#xff08;CRF&#xff09;是 x264 和 x265 编码器的默认质量&#xff08;和码率控制&#xff09;设置。取值范围是 0 到 51&#xff0c;这其中越低的值&#xff0c;结果质量越好&#xff0c;同…

实力见证丨酷雷曼VR再获2项国家发明专利

近日&#xff0c;酷雷曼公司&#xff08;北京同创蓝天云科技有限公司&#xff09;再次喜获两项发明专利证书:“VR多端协同交互方法及相关设备”、“VR展示用户操作方法及相关设备” 。两项专利均基于酷雷曼3D VR系统发明&#xff0c;进一步优化了目前VR全景触控界面互动性及交互…

企业所得税高是怎么回事?该如何解决?

企业所得税高是怎么回事&#xff1f;该如何解决&#xff1f; 《税筹顾问》专注于园区招商、企业税务筹划&#xff0c;合理合规助力企业节税! 企业所得税高&#xff0c;一般企业都会运用一些税务筹划的方式来解决&#xff0c;那么事前的规划和搭建好业务框架就显得尤为重要。真…

FFmpeg初识

一、简介 它的官网为&#xff1a;https://ffmpeg.org/&#xff0c;由Fabrice Bellard&#xff08;法国著名程序员Born in 1972&#xff09;于2000年发起创建的开源项目。该人是个牛人&#xff0c;在很多领域都有很大的贡献。 FFmpeg是多媒体领域的万能工具。只要涉及音视频领域…

第43步 深度学习图像识别:InceptionResnetV2建模(Tensorflow)

基于WIN10的64位系统演示 一、写在前面 &#xff08;1&#xff09;InceptionResnetV2 InceptionResNetV2是一种由Google研究人员开发的深度学习模型&#xff0c;是一种混合了Inception和ResNet&#xff08;残差网络&#xff09;两种结构的卷积神经网络&#xff08;CNN&#…

通过cifar-10数据集理解numpy数组的长(H)、宽(W)、通道(C)

文章目录 1、CIFAR-10数据集介绍1.1 CIFAR-10数据集的构成1.2 batches.meta1.3 data_batch_n.py & test_batch.py 2、获取一张图片的data数据2.1 反序列化获得numpy数据2.2 清楚numpy中的H、W、C的含义2.3 清楚RGB图片在numpy中的表示 3、处理图片数据的代码 1、CIFAR-10数…

渗透测试实战-BurpSuite 使用入门

前言 近期笔者在学习 web 渗透测试的相关内容&#xff0c;主要是为了公司之后的安全产品服务。渗透测试本身在学习过程中还是很有意思的&#xff0c;有一种学习到了之前想学但是没学的黑客技术的感觉&#xff0c;并且对笔者已掌握的许多知识做了有益的补充。要学习渗透测试&…

ThinkPHP 对接美团大众点评团购券(门票)

一、功能简要介绍 1、根据需求&#xff0c;用户在美团大众点评中所购买的门票在自己的系统上可以核销&#xff0c;同时把核销信息存储到自己的系统里。2、美团点评API文档地址&#xff1a;[https://open.dianping.com/document/v2?rootDocId5000](https://open.dianping.com/…