聊聊 golang 中 channel

news2025/1/8 18:56:46

1、引言

Do not communicate by sharing memory; instead, share memory by communicating

Golang 的并发哲学是“不要通过共享内存进行通信,而要通过通信来共享内存”,提倡通过 channel 进行 goroutine 之间的数据传递和同步,而不是通过共享变量(内存)来实现。

func write(chanInt chan int) {
	for i := 0; i < 10; i++ {
		chanInt <- i
	}
	close(chanInt)
}

func read(chanInt chan int, chanExit chan bool) {
	for {
		v, ok := <-chanInt
		if !ok {
			break
		}
		fmt.Println(v)
	}
	chanExit <- true
	close(chanExit)
}

func TestCSP(t *testing.T) {
	chanInt := make(chan int, 10)
	chanExit := make(chan bool)
	go write(chanInt)
	go read(chanInt, chanExit)

	for {
		select {
		case _, ok := <-chanExit:
			if !ok {
				fmt.Println("done")
				return
			}
		}
	}

}

如上述示例,write 函数负责写,read 函数负责读,chanInt 负责在两个 goroutine 进行数据同步,chanExit 负责监听数据已处理完成,并最终退出。整个程序没有看到锁,非常的优雅。

接下来,来说说 channel 的特性,最后结合底层源码来加深印象。

2、特性

2.1 基本用法

由于 channel 是引用类型,需要用 make 来初始化

chanBuffer := make(chan int, 10)
chanNoBuffer := make(chan int)

这里创建的是可读写的 channel,区别在于是否有 capacity(容量)

  • 带缓冲区的 channel,可以存储 cap 个数据
  • 不带缓冲区的 channel,一般用于同步
chanWriteOnly := make(chan<- int)
chanReadOnly := make(<-chan int)

这里创建的是只写和只读的 channel,不过这样写意义不大,一般用于传参,接下来用这两个 chan 把引言示示例中关于 writeread 函数给改下

func write(chanInt chan<- int) {
	for i := 0; i < 10; i++ {
		chanInt <- i
	}
	close(chanInt)
}

func read(chanInt <-chan int, chanExit chan bool) {
	for {
		v, ok := <-chanInt
		if !ok {
			break
		}
		fmt.Println(v)
	}
	chanExit <- true
	close(chanExit)
}

查看 channel 的长度和容量

func TestChanLenCAP(t *testing.T) {
	chanInt := make(chan int, 2)
	chanInt <- 1
	fmt.Println(len(chanInt)) // 1
	fmt.Println(cap(chanInt)) // 2
}

关闭 channel

close(ch)

判断 channel 是否已关闭

func TestChanIsClosed(t *testing.T) {
	chanInt := make(chan int, 10)
	close(chanInt)
	if _, ok := <-chanInt; !ok {
		fmt.Println("closed")
	}
}

向一个已关闭的 channel 读数据,会读到零值,并且每次读也都是零值,因此可以利用这个特性来判断 channel 是否已关闭。

2.2 异常情况

接下来看看几种需要注意的异常情况

注意: Golang 版本为 1.19.12。不同版本的调度器和运行时的行为可能会有所不同,尤其是与死锁检测相关的机制。这些变化可能导致在某些版本中程序会更快地检测到死锁,而在其他版本中则可能仅仅是阻塞而不报错。

2.2.1 给一个 nil channel发送数据,

func TestWriteNil(t *testing.T) {
	var chanInt chan int
	chanInt <- 1
}

由于 chanInt 还没初始化,值为 nil,此时代码会阻塞在 chanInt <- 1 这一行,并最终形成死锁。

fatal error: all goroutines are asleep - deadlock!

解法:channel 使用前需要使用 make 初始化。

2.2.2 从一个 nil channel 读数据

func TestReadNil(t *testing.T) {
	var chanInt chan int
	<-chanInt
}

由于 chanInt 还没初始化,值为 nil,此时代码会阻塞在 <-chanInt 这一行,并最终形成死锁。

fatal error: all goroutines are asleep - deadlock!

解法:channel 使用前需要使用 make 初始化。

2.2.3 关闭一个 nil channel

func TestCloseNil(t *testing.T) {
	var chanInt chan int
	close(chanInt)
}

如果尝试关闭一个 nilchannel,会导致运行时错误 panic: close of nil channel

panic: close of nil channel [recovered]
	panic: close of nil channel

解法:channel 使用前需要使用 make 初始化。

前三个异常说明,channel 使用前一定要使用 make 进行初始化。

2.2.4 向一个已关闭的 channel 发数据

func TestWriteClosed(t *testing.T) {
	chanNoBuffer := make(chan int)
	close(chanNoBuffer)
	chanNoBuffer <- 1
}

向一个已关闭的 channel 发送数据会引起 panic

panic: send on closed channel [recovered]
	panic: send on closed channel

这是因为一旦 channel 被关闭,就不能再向其发送数据,但可以继续从中接收数据

解法:判断 channel 是否已关闭。

2.2.5 向一个已关闭的 channel 发起重复关闭动作

func TestClosedOnceMore(t *testing.T) {
	chanNoBuffer := make(chan int)
	close(chanNoBuffer)
	close(chanNoBuffer)
}

尝试关闭一个已经关闭的 channel 会导致运行时错误 panic: close of closed channel。这个错误通常出现在多个 goroutine 试图关闭同一个 channel 或者代码逻辑不正确导致同一个 channel 被关闭多次。

panic: close of closed channel [recovered]
	panic: close of closed channel

解法:判断 channel 是否已关闭。

2.2.6 向没有缓冲区的 channel 写数据,但没有读取方

func TestSendNoBuffer(t *testing.T) {
	ch := make(chan int)
	ch <- 4
}

无缓冲的 channel 是一种同步通信机制,当只有发送方,没有接收方,会陷入阻塞而死锁。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

2.2.7 向没有缓冲区的 channel 读取数据,但没有写入方

func TestReadNoBuffer(t *testing.T) {
	ch := make(chan int)
	<-ch
}

尝试从一个无缓冲的 channel 读取数据时,如果没有其他 goroutine 向该 channel 发送数据,读取操作将会阻塞。这会导致程序死锁,并最终导致运行时错误。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

2.2.8 无缓冲区 channel 的发送和接收操作没有同时进行

func ReadNoBufferChan(chanBool chan bool) {
	<-chanBool
}

func TestSendNoBufferChan(t *testing.T) {
	ch := make(chan bool)
	ch <- true
	go ReadNoBufferChan(ch)
	time.Sleep(1 * time.Second)
}

上面两个异常一直强调,由于无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。代码执行到 ch <- chan 时,调度器发现没有任何 goroutine 接收,于是阻塞并死锁。

fatal error: all goroutines are asleep - deadlock!

解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。

func TestSendNoBufferChan(t *testing.T) {
	ch := make(chan bool)
	go ReadNoBufferChan(ch)
	ch <- true
	time.Sleep(1 * time.Second)
}

go ReadNoBufferChan(ch) 提前,这样就确保了在发送数据之前,有一个 goroutine 正在等待接收数据。

对于无缓冲的 channel

  • 读取和写入要成对出现,并且不能在同一个 goroutine
  • 使用 for 读取数据时,写入方需要关闭 channel

2.2.9 向有缓存区的 channel 先读数据

func TestWriteBufferChan(t *testing.T) {
	ch := make(chan int, 1)
	if _, ok := <-ch; !ok {
		fmt.Println("closed")
	}
}

当尝试从一个空的带缓冲的 channel 读取数据时,读取操作会阻塞,直到有数据被写入 channel。这是因为即使是带缓冲的 channel,也需要在读取数据时有数据可读。

带缓冲的 channel 和无缓冲的 channel 的主要区别在于:带缓冲的 channel 可以存储一定数量的数据,而无缓冲的 channel 则需要发送和接收操作同步进行。然而,这并不改变以下事实:当一个 goroutine 试图从空的 channel 读取数据时,它会被阻塞,直到有其他 goroutine 写入数据。

fatal error: all goroutines are asleep - deadlock!

解法:需要在读取数据时有数据可读。

2.2.10 向有缓存区的 channel 写数据,但没有读取数据

func TestReadBufferChan(t *testing.T) {
	ch := make(chan int, 1)
	ch <- 1
	ch <- 2
}

当带缓冲的 channel 在缓冲区满时,写入操作会阻塞,直到有数据被读取以腾出缓冲区空间。如没有读取方,最后就会因阻塞而死锁。

fatal error: all goroutines are asleep - deadlock!

解法:当带缓冲的 channel 在缓冲区满时,需要有读取方,或者增加缓冲区的大小。

注意:对于带缓冲的 channel 在缓冲区没超过容量之前,写入数据,若没有读取,不像不带缓冲区的 channel 那样,不会产生死锁的。

其实,最后这两个带缓冲区 channel 异常情况总结就是

  • 若在同一个 goroutine 里,写数据操作一定在读数据操作前
  • channel 空了,接收者会阻塞
  • channel 满了,发送者会阻塞

3、底层实现

3.1 数据结构

Golangchannel 在运行时使用 runtime.hchan 结构体表示。

// runtime/chan.go
type hchan struct {
    qcount   uint           // 队列中的数据个数
    dataqsiz uint           // 环形缓冲区的大小
    buf      unsafe.Pointer // 环形缓冲区指针
    elemsize uint16         // 单个元素的大小
    closed   uint32         // 标志 channel 是否关闭
    elemtype *_type         // 元素的类型
    sendx    uint           // 发送操作的索引
    recvx    uint           // 接收操作的索引
    recvq    waitq          // 等待接收的 goroutine 队列
    sendq    waitq          // 等待发送的 goroutine 队列
    lock     mutex          // 保护 channel 的锁
}

在这里插入图片描述

先看看环形缓冲区相关的字段:

  • qcount: 当前缓冲区中的元素个数。
  • dataqsiz: 环形缓冲区的容量。
  • buf: 实际存储数据的缓冲区,类型为 unsafe.Pointer(类似 C 语言的 void *)。
  • elemsize: 每个元素的大小。
  • sendx: 环形缓冲区中下一个待写入的位置。
  • recvx: 环形缓冲区中下一个待读取的位置。

再来看看发送和接收队列:

  • recvq: 等待接收的 goroutine 队列。
  • sendq: 等待发送的 goroutine 队列。

这两个队列是通过 waitq 结构体来实现的,waitq 本质上是一个双向链表,链表中的每个节点是一个 sudog 结构体,sudog 代表一个等待中的 goroutine

type waitq struct {
	first *sudog
	last  *sudog
}

最后看看 lock 字段

  • lock 锁用于保护 channel 数据结构的互斥锁。Golang 使用自旋锁和互斥锁的结合来保证 channel 操作的线程安全。

3.2 初始化

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

这里主要说下 switch 相关的分支代码

  • 第一个分支:如果 channel 的缓冲区大小是 0(也就是创建无缓冲 channel),或 channel 中的元素大小是 0(如 struct{}{}Golang 中“空结构体”是不占内存的,size0)时,调用 mallocgc() 在堆上为 channel 开辟一段大小为 hchanSize 的内存空间。
    • 这里说下 c.buf = c.raceaddr()c.raceaddr() 会返回一个地址,这个地址在内存中不会被实际用于存储数据,但会被数据竞争检测工具(如 Golangrace detector)用于同步,这也是无缓冲区的 channel 用来做数据同步场景的由来。
  • 第二个分支:如果元素不包含指针时。调用 mallocgc 一次性分配 hchanbuf 的内存。
  • 第三个分支:默认情况元素类型中有指针类型,调用了两次分配空间的函数 new/mallocgc

仔细看,三个分支都调用了 mallocgc 在堆上分配内存,也就说 channel 本身会被 GC 自动回收。

在函数的最后会初始化通道结构的字段,包括元素大小、元素类型、缓冲区大小和锁。

3.2 发送数据

// entry point for c <- x from compiled code
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 当 channel 为 nil 时处理
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	// 竞态检测,是用来分析是否存在数据竞争。go test -race ./...
	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	//
	// After observing that the channel is not closed, we observe that the channel is
	// not ready for sending. Each of these observations is a single word-sized read
	// (first c.closed and second full()).
	// Because a closed channel cannot transition from 'ready for sending' to
	// 'not ready for sending', even if the channel is closed between the two observations,
	// they imply a moment between the two when the channel was both not yet closed
	// and not ready for sending. We behave as if we observed the channel at that moment,
	// and report that the send cannot proceed.
	//
	// It is okay if the reads are reordered here: if we observe that the channel is not
	// ready for sending and then observe that it is not closed, that implies that the
	// channel wasn't closed during the first observation. However, nothing here
	// guarantees forward progress. We rely on the side effects of lock release in
	// chanrecv() and closechan() to update this thread's view of c.closed and full().
	if !block && c.closed == 0 && full(c) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// 加锁
	lock(&c.lock)

	// 检查 channel 是否关闭
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	// 检查是否有等待接收的 goroutine
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 检查 channel 缓冲区是否有空位
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	// 非阻塞模式下
	if !block {
		unlock(&c.lock)
		return false
	}

	// 阻塞模式下,将当前 goroutine 加入发送队列并挂起,receiver 会帮我们完成后续的工作
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	// 打包 sudog
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil

	// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.

	// 将这个发送 g 从 Grunning -> Gwaiting
    // 进入休眠
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// 以下唤醒后需要执行的代码
	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		// 唤醒后,发现 channel 被关闭了
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}

代码比较长,可以分为两大部分:异常检测和发送数据

3.2.1 异常检测

代码一开始就排除了在异常章节中 nil channel 的情形,比如未初始化,或是被 GC 回收了。

接着会检测非阻塞模式下,也就是有缓冲区的 channel,如果还未 close 并且缓冲区已经满了,则直接返回 false


func TestASyncSendFull(t *testing.T) {
	ch := make(chan int, 1) // 创建一个缓冲区大小为 1 的 channel
	ch <- 1                 // 向 channel 发送一个元素,此时缓冲区已满

	select {
	case ch <- 2: // 尝试发送第二个元素
		fmt.Println("Successfully sent 2")
	default: // 缓冲区已满,进入 default 分支
		fmt.Println("channel is full, unable to send 2")
	}
}

3.2.2 发送数据

发送数据可以归纳为以下三点

  • 直接发送:当 recvq 存在等待的接收者时,那么通过 runtime.send 直接将数据发送给阻塞的接收者
    • 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的 goroutine 标记成可运行状态 grunnable 并把该 goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
  • 异步发送:当 buf 缓冲区存在空余空间时,将发送的数据写入 channel 的缓冲区;
  • 阻塞发送:当不存在缓冲区或者缓冲区已满时,等待其他 goroutinechannel 接收数据;
    • 将当前 goroutine 加入 sendq 发送队列并挂起,阻塞等待其他的协程从 channel 接收数据;
    • 当唤醒后,检查是否因为 channel 关闭而唤醒,如果是则触发 panic

发送数据的过程中包含几个会触发 goroutine 调度的时机:

  • 发送数据时发现 channel 上存在等待接收数据的 goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度
  • 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 channelsendq 发送队列并调用 runtime.goparkunlock 触发 goroutine 的调度让出处理器的使用权;

3.3 接收数据

// entry points for <- c from compiled code
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	// 如果在 nil channel 上进行 recv 操作,那么会永远阻塞
	if c == nil {
		// 非阻塞的情况下,要直接返回,非阻塞出现在一些 select 的场景中
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// Fast path: check for failed non-blocking operation without acquiring the lock.
	if !block && empty(c) {
		// After observing that the channel is not ready for receiving, we observe whether the
		// channel is closed.
		//
		// Reordering of these checks could lead to incorrect behavior when racing with a close.
		// For example, if the channel was open and not empty, was closed, and then drained,
		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
		// we use atomic loads for both checks, and rely on emptying and closing to happen in
		// separate critical sections under the same lock.  This assumption fails when closing
		// an unbuffered channel with a blocked send, but that is an error condition anyway.
		if atomic.Load(&c.closed) == 0 {
			// Because a channel cannot be reopened, the later observation of the channel
			// being not closed implies that it was also not closed at the moment of the
			// first observation. We behave as if we observed the channel at that moment
			// and report that the receive cannot proceed.
			return
		}
		// The channel is irreversibly closed. Re-check whether the channel has any pending data
		// to receive, which could have arrived between the empty and closed checks above.
		// Sequential consistency is also required here, when racing with such a send.
		if empty(c) {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	// 当前 channel 中没有数据可读
	if c.closed != 0 {
		if c.qcount == 0 {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			unlock(&c.lock)
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
		// The channel has been closed, but the channel's buffer have data.
	} else {
		// sender 队列中有 sudog 在等待
    	// 直接从该 sudog 中获取数据拷贝到当前 g 即可
		// Just found waiting sender with not closed.
		if sg := c.sendq.dequeue(); sg != nil {
			// Found a waiting sender. If buffer is size 0, receive value
			// directly from sender. Otherwise, receive from head of queue
			// and add sender's value to the tail of the queue (both map to
			// the same buffer slot because the queue is full).
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}

	if c.qcount > 0 {
		// 直接从 buffer 里拷贝数据
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		// 接收索引 +1
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// buffer 元素计数 -1
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	// 非阻塞时,且无数据可收
	if !block {
		unlock(&c.lock)
		return false, false
	}

	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	// 打包成 sudog
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	// 进入 recvq 队列
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	// 被唤醒
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	// 如果 channel 未被关闭,那就是真的 recv 到数据了
	return true, success
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		if ep != nil {
			// copy data from sender
			// 直接从发送者复制数据
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// 缓冲区已满,从队列头部取出数据
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
		}
		// 将数据从队列复制到接收者
		// copy data from queue to receiver
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 将数据从发送者复制到队列
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

Golangchannel 中,有两种接收方式

num <- ch
num, ok <- ch

这两种分别对应上述源码中的 chanrecv1chanrecv2,不过最终都会走到 chanrecv 函数。

3.3.1 异常检测

当我们从一个 nil channel 接收数据时(这里 nil 有可能是被 GC 回收导致的),若是非阻塞的 channel 会直接返回,否则会直接调用 runtime.gopark 让出处理器的使用权。

如果当前 channel 已经被 close 并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回。这里也就说明了为什么可以多次从已关闭的 channel 读取数据而不会报错。

3.3.2 接收数据

channel 接收数据可以归纳为以下三种情况:

3.3.2.1 直接接收

sendq 发送队列存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据。具体分为以下两种场景,可以仔细看 recv 函数

  • 场景一

buf 缓冲区的容量 dataqsiz0,也就是同步的 channel,调用 recvDirectsendq 发送队列中 sudog 存储的 ep 数据直接拷贝到接收者的内存地址中。

  • 场景二

当缓冲区已满时(会有两次内存的拷贝)

  • 先取出 buf 缓冲区头部的数据发给接收者(第一次拷贝)
  • 接着取出 sendq 发送队列头的数据拷贝到 buf 缓冲区中,并释放一个 sudog 阻塞的 goroutine(第二次拷贝)

到这里获取有人会问,为什么不直接从 sendq 取出数据发给接收方,而是要从 buf 里取出发给接收方?

原因在于 Golang 在缓冲模式下,channel 的数据在缓冲区中按照 FIFO(先入先出)顺序存储。缓冲区头部的数据肯定是最先存入的,那么也就需要最先取出。

这里再说下场景二下关于 recvxsendx 的更新机制。

  • 缓冲区已满时的处理逻辑

buf 缓冲区满时,recvx 指向的是 buf 的头部位置,这也是下一个将要被接收的数据。注意此时 sendx 也是指向缓冲区的头部位置。因为缓冲区已满,下一次发送会覆盖最旧的数据。

  • 从缓冲区读取数据

此时从已满的 buf 缓冲区读取数据,接收者从缓冲区的头部位置 recvx 获取数据,并将数据传递给接收方。并更新 recvx,使其指向下一个将要被接收的数据位置。

  • sendq 拷贝到缓冲区

由于此时 buf 头部的数据已经发送,那么则取出 sendq 头部的数据覆盖刚刚头部的位置所在的数据,并更新 sendx,使其和 recvx 保持一致,指向下一个要发送的位置。

这两个场景,无论发生哪种情况,运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 goroutine,在调度器下一次调度时将阻塞的发送方唤醒。

3.3.2.2 异步接收

buf 缓冲区的 qcount 大于 0 时,也就是带缓冲的 channel 有数据时,那么会从 buf 缓冲区中 recvx 的索引位置取出数据进行处理:

  • 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中,并通过 runtime.typedmemclr 清除队列中的数据
  • 最后更新 channel 上相关数据:recvx 指向下一个位置(如果移动到了环形队列的队尾,下标需要回到队头),channelqcount 长度减一,并释放持有 channel 的锁
3.3.2.3 阻塞接收

当不属于上述两种情况,即当 channelsendq 发送队列中不存在等待的 goroutine 并且 buf 缓冲区中也不存在任何数据时,从 channel 中接收数据的操作会变成阻塞的。此时会将当前的goroutine 挂起并加入 channel 的接收队列 recvq,以便在有数据可用时能够被唤醒。

当然了,若是 goroutine 被唤醒后会完成 channel 的阻塞数据接收。接收完最后进行基本的参数检查,解除 channel 的绑定并释放 sudog

结合异常检测那一节,发现从 channel 接收数据时,会触发 goroutine 调度的两个时机:

  • channelnil
  • buf 缓冲区中不存在数据并且也不存在数据的发送者时

3.4 关闭管道

最后来看看关闭通道实现

func closechan(c *hchan) {
	// 关闭一个 nil channel 会直接 panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	// 上锁,这个锁的粒度比较大,一直到释放完所有的 sudog 才解锁
	lock(&c.lock)
	
	// 在 close channel 时,如果 channel 已经关闭过了,直接触发 panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}

	c.closed = 1

	var glist gList

	// release all readers
	for {
		sg := c.recvq.dequeue()
		// 弹出的 sudog 是 nil,说明读队列已经空了
		if sg == nil {
			break
		}
		// sg.elem unsafe.Pointer,指向 sudog 的数据元素
        // 该元素可能在堆上分配,也可能在栈上
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		// 将 goroutine 入 glist
        // 为最后将全部 goroutine 都 ready 做准备
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
	// 将所有挂在 channel 上的 writer 从 sendq 中弹出
    // 该操作会使所有 writer panic(向一个关闭的 channel 发数据会引起 panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		// 将所有挂在 channel 上的 writer 从 sendq 中弹出
    	// 该操作会使所有 writer panic
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	// 在释放所有挂在 channel 上的读或写 sudog 时,是一直在临界区的
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		// 使 g 的状态切换到 Grunnable
		goready(gp, 3)
	}
}

3.4.1 异常检测

  • 关闭一个 nil channel 会直接 panic
  • close channel 时,如果 channel 已经关闭过了,直接触发 panic

3.4.2 释放所有接收方和发送方

关闭 channel 的主要工作是释放所有的 readerswriters

主要就是取出 recvqsendqsudog 加入到 goroutine 待清除 glist 队列中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素。同时需要注意的是:在处理 sendq 时有可能会 panic,在之前的异常情况中列举往一个 closechannel 发送数据会引起 panic

最后会为所有被阻塞的 goroutine 调用 runtime.goready 触发调度。将所有 glist 队列中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度。

3.4.3 优雅关闭通道

最后说说如何优雅关闭 channel

通过之前的异常小节介绍,发现:

  • 向已关闭的 channel 发送数据,会导致 panic
  • 重复关闭 channel,也会导致 panic

同时,还了解了:

  • 从一个已关闭的 channel 中接收数据,会得到零值,且不会导致程序异常
  • 关闭一个 channel,那么所有接收这个 channelselect case 都会收到信号

那么这里就引用 How to Gracefully Close Channels 介绍的优雅关闭 channel 方法来收尾。

package _0240623

import (
	"log"
	"math/rand"
	"strconv"
	"sync"
	"testing"
	"time"
)

func TesGracefullyCloseChannel(t *testing.T) {
	rand.Seed(time.Now().UnixNano()) // needed before Go 1.20
	log.SetFlags(0)

	// ...
	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// ...
	dataCh := make(chan int)
	stopCh := make(chan struct{})
	// stopCh is an additional signal channel.
	// Its sender is the moderator goroutine shown
	// below, and its receivers are all senders
	// and receivers of dataCh.
	toStop := make(chan string, 1)
	// The channel toStop is used to notify the
	// moderator to close the additional signal
	// channel (stopCh). Its senders are any senders
	// and receivers of dataCh, and its receiver is
	// the moderator goroutine shown below.
	// It must be a buffered channel.

	var stoppedBy string

	// moderator
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// senders
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					// Here, the try-send operation is
					// to notify the moderator to close
					// the additional signal channel.
					select {
					case toStop <- "sender#" + id:
					default:
					}
					return
				}

				// The try-receive operation here is to
				// try to exit the sender goroutine as
				// early as possible. Try-receive and
				// try-send select blocks are specially
				// optimized by the standard Go
				// compiler, so they are very efficient.
				select {
				case <-stopCh:
					return
				default:
				}

				// Even if stopCh is closed, the first
				// branch in this select block might be
				// still not selected for some loops
				// (and for ever in theory) if the send
				// to dataCh is also non-blocking. If
				// this is unacceptable, then the above
				// try-receive operation is essential.
				select {
				case <-stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()

			for {
				// Same as the sender goroutine, the
				// try-receive operation here is to
				// try to exit the receiver goroutine
				// as early as possible.
				select {
				case <-stopCh:
					return
				default:
				}

				// Even if stopCh is closed, the first
				// branch in this select block might be
				// still not selected for some loops
				// (and forever in theory) if the receive
				// from dataCh is also non-blocking. If
				// this is not acceptable, then the above
				// try-receive operation is essential.
				select {
				case <-stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						// Here, the same trick is
						// used to notify the moderator
						// to close the additional
						// signal channel.
						select {
						case toStop <- "receiver#" + id:
						default:
						}
						return
					}

					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	// ...
	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}

这段代码的核心是这里

// moderator
go func() {
	stoppedBy = <-toStop
	close(stopCh)
}()

对于生产者和消费者是 M*N 的情况,显然既不能在生产方关闭通道,也不适合在消费方关闭通道。那么就引入中间方,那就是 toStop,起个 goroutine 然后 stoppedBy = <-toStop 阻塞在这里,只要生产者和消费者一方满足条件,向 toStop 写入数据了,那么就可以关闭 stopCh。这也正好契合上面的 moderator 注释,一个 协调者,用来协调生产者和消费者在 M*N 情况下如何优雅关闭 channel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1859319.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

YashanDB为新质生产力赋能 灌注合肥区域转型源动力

当前&#xff0c;数据要素已成为我国数字经济的“核心引擎”与“关键生产要素”&#xff0c;为全面激发数据要素的价值&#xff0c;各地区正积极探索数据要素交易平台的可行模式&#xff0c;加快在数据要素领域的布局。近日&#xff0c;深圳计算科学研究院崖山数据库系列产品受…

JDBC从入门到精通-笔记(一):JDBC基本概念与开发基础

视频资源&#xff1a;JDBC从入门到精通视频教程-JDBC实战精讲_哔哩哔哩_bilibili JDBC定义与本质 概念 什么是JDBC&#xff1a;Java DataBase Connectivity JDBC本质&#xff1a;SUN公司制定的一套接口&#xff08;interface&#xff09;&#xff0c;java.sql.*。 面向接口调…

【progressBar-js】优雅的 前端进度条 构建!

progressBar-js JS 前端进度条小工具 您可以通过此工具来构建一个有效的工具条&#xff0c;接下来就是一个示例&#xff01; 使用示例 引入 progressBar-js 库 直接在这里将 css 和 js 文件引入进来就算是成功导入了哦&#xff01;&#xff01;&#xff01; <link href&…

SVN学习(001 svn安装)

尚硅谷SVN高级教程(svn操作详解) 总时长 4:53:00 共72P 此文章包含第1p-第p19的内容 介绍 为什么使用版本控制工具 版本控制工具的功能 版本控制简介 客户端服务器结构 c/s结构 服务端的结构&#xff1a; 服务程序 、版本库(存放我们上传的文件) 客户端的三个基本操作&#…

高考填报志愿,选专业和选学校,哪个优先?

一、 专业优先&#xff0c;还是学校优先&#xff1f; 专业和学校都非常重要&#xff0c;好的学校可以给你提供较高的学习平台&#xff0c;好的专业能够给将来的职业生涯提供便利。高考报考&#xff0c;每一个学校的每一个专业的分数都会不同&#xff0c;热门的专业分数较高&am…

Swift 周报 第五十三期

文章目录 前言新闻和社区苹果公司取得基于波束组合的信道状态信息&#xff08;CSI&#xff09;反馈专利&#xff0c;为 5G 网络中的信道状态信息&#xff08;CSI&#xff09;报告提供新方案关于在欧盟分发 App 的最新信息公司快评&#xff5c;新广告引发不满&#xff0c;苹果也…

大模型揭秘:AI与CatGPT在实体识别中的创新应用

摘要 尽管大规模语言模型 (LLM) 在各种 NLP 任务上已经取得了 SOTA 性能&#xff0c;但它在 NER 上的性能仍然明显低于监督基线。这是由于 NER 和 LLMs 这两个任务之间的差距&#xff1a;前者本质上是序列标记任务&#xff0c;而后者是文本生成模型。在本文中&#xff0c;我们…

被年轻人买爆的转运能量石,戴一天竟等于拍千次胸片?

离谱的事年年有&#xff0c;这几年可以说非常多&#xff01;‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍‍ 尤其是这届年轻人&#xff0c;不知道什么时候开始&#xff0c;越来越迷信了&#xff01; 比如去年很…

基于在校学习平台MOOC的选课推荐系统

基于在校学习平台MOOC的选课推荐系统 1、效果 在线demo&#xff0c;点我查看 2、功能 根据学生于在校学习平台MOOC学习期间的选课记录等相关特征来对学生进行课程推荐。 采用数据挖掘技术&#xff0c;包括BPR、FM、CF&#xff0c;神经网络推荐&#xff0c;用户协同过滤推荐…

2024 CISCN 华东北分区赛-Ahisec

Ahisec战队 WEB python-1 break 源码如下&#xff1a; # -*- coding: UTF-8 -*-from flask import Flask, request,render_template,render_template_stringapp Flask(__name__)def blacklist(name):blacklists ["print","cat","flag",&q…

【嵌入式Linux】<总览> 多进程(更新中)

文章目录 前言 一、进程的概念与结构 1. 相关概念 2. 内核区中的进程结构 3. 进程的状态 4. 获取进程ID函数 二、进程创建 1. fork和vfork函数 2. 额外注意点 3. 构建进程链 4.构建进程扇 三、进程终止 1. C程序的启动过程 2. 进程终止方式 四、特殊的进程 1. 僵…

AppInventor2添加超过10个屏幕会怎样?

之前发过一篇AppInventor2官方翻译文档&#xff0c;建议一个项目不要超过10个屏幕&#xff0c;详见&#xff1a; App Inventor 2 构建多屏幕App的最佳实践 App Inventor 可以轻松地向应用程序添加更多屏幕&#xff0c;但最好也不要添加太多屏幕&#xff0c;因为多个屏幕的应用…

U盘数据恢复全攻略:从原理到实践

一、引言&#xff1a;为何U盘数据恢复至关重要 在信息化时代&#xff0c;U盘作为便携存储设备&#xff0c;广泛应用于各个领域。然而&#xff0c;U盘数据的丢失往往给个人和企业带来极大的困扰。数据丢失的原因多种多样&#xff0c;可能是误删除、格式化、文件系统损坏&#x…

探索约束LLM输出JSON的应用

0、 引言 JSON&#xff08;JavaScript Object Notation&#xff09;因其简洁、易读和易于解析的特性&#xff0c;已成为全球使用最广泛的数据交换格式之一。它能够满足各种数据交换需求&#xff0c;特别是在构建人工智能驱动的应用程序时&#xff0c;工程师们经常需要将大型语…

Jenkins教程-8-上下游关联自动化测试任务构建

上一小节小节我们学习了一下Jenkins自动化测试任务发送测试结果邮件的方法&#xff0c;本小节我们讲解一下Jenkins上下游关联自动化测试任务的构建。 下面我们以一个真实的自动化测试场景来讲解Jenkins如何管理上下游关联任务的触发和构建&#xff0c;比如我们有两个jenkin任务…

基础入门篇 | YOLOv10 项目【训练】【验证】【推理】最简单教程 | YOLOv10必看 | 最新更新,直接打印 FPS,mAP50,75,95

文章目录 训练 --train.py推理 --detect.py验证 --val.py不训练,只查看模型结构/参数量/计算量 --test.pyYOLOv10 是基于 YOLOv8 项目的改进版本,目前已经被 YOLOv8 项目合并,所以两个算法使用方法完全一致~ 今天我给大家展示一种非常方便的使用过程,包含【训练】【验证】…

情绪管理篇:让七情自然流露,不过分压抑也不掺杂极端的想法即可来去自如

情绪管理篇&#xff1a; 人有七情&#xff0c;本属常理&#xff0c;该哭的时候哭、该笑的时候笑、该怒的时候怒、该忧的时候忧 学习圣贤之学&#xff0c;并非让我们像木头人一样&#xff0c;枯木死灰&#xff0c;而要让自己不要被七情所缠缚、被七情所乱心&#xff0c;我们的喜…

QT拖放事件之三:自定义拖放操作-利用QDrag来拖动完成数据的传输

1、运行效果 1)Qt::MoveAction 2)Qt::CopyAction 2、源码 #include "Widget.h" #include "ui_Widget.h" #include "common.h"

JDBC的概念 ,核心API的介绍 , 注册驱动介绍

第一章 JDBC 1、JDBC的概念 目标 能够掌握JDBC的概念能够理解JDBC的作用 讲解 客户端操作MySQL数据库的方式 使用第三方客户端来访问MySQL&#xff1a;SQLyog、Navicat 使用MySQL自带的命令行方式 通过Java来访问MySQL数据库&#xff0c;今天要学习的内容 如何通过Java代…

考研数学|《李林880》正确率多少算合格?

李林880题是针对考研数学三的练习题集&#xff0c;覆盖了考研数学三的主要知识点和题型。如果能够熟练掌握这些题目&#xff0c;意味着对考研数学三的知识点有了较为深入的理解和应用能力。 首先&#xff0c;考研数学三的总分是150分&#xff0c;题型包括单选题、填空题和解答…