golang并发安全-select

news2024/11/17 0:00:29

前面说了golang的channel, 今天我们看看golang select 是怎么实现的。

数据结构

type scase struct {
	c    *hchan         // chan
	elem unsafe.Pointer // 数据
}

select 非默认的case 中都是处理channel 的 接受和发送,所有scase 结构体中c是用来存储select 的case中使用的channel

处理流程

select case 场景

编译器在中间代码生成期间会根据 select 中 case 的不同对控制语句进行优化,这一过程都发生在cmd/compile/internal/walk/select.go 中,下面会根据不同的场景进行分析代码。

没有case

代码示例

func main() {
    select {}
}

如果是空的select语句,程序会被阻塞,golang 带有死锁监测机制:如果当前写成无法被唤醒,则会panic

源码解读

在runtime/select.go中可以看到:如果cases为空直接调用gopark函数以waitReasonSelectNoCases的原因挂起当前的协程,并且无法被唤醒,golang监测到直接panic。

同样我们在walk/select.go的walkSelectCases函数中可以看到,如果case为空直接调用runtime.block函数

只有一个case

代码示例

func main() {
	ch := make(chan int)
	go func() {
		ch <- 1
	}()
	select {
	case data := <-ch:
		fmt.Println("ch data:", data)
	}
}

如果有输入直接打印ch data : 1 , 没有的话会被检测出all goroutines are asleep - deadlock!(和没有case的一样)

源码解读

如果一个非default case ,将读写转换成 ch <- 或 <- ch, 正常的channel读写

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
	// optimization: one-case select: single op.
	if ncas == 1 {
		cas := cases[0] //获取case
		ir.SetPos(cas)
		l := cas.Init()
		if cas.Comm != nil { // 不是默认
			n := cas.Comm // 获取case的条件语句
			l = append(l, ir.TakeInit(n)...)
			switch n.Op() {
			default:
				base.Fatalf("select %v", n.Op())

			case ir.OSEND: // 如果是 send, 无须处理
				// already ok

			case ir.OSELRECV2:
				r := n.(*ir.AssignListStmt)
                // 如果不是 data, ok := <- ch 类型,处理成<- ch
				if ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {
					n = r.Rhs[0]
					break
				}
                // 是的话, op设置成data, ok := <- ch形式
				r.SetOp(ir.OAS2RECV)
			}

			l = append(l, n)
		}
        // 将case 条件后要执行的语句加入带执行的列表
		l = append(l, cas.Body...)
        // 加入 break类型,跳出select-case 
		l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
		return l
	}
    // convert case value arguments to addresses.
	// this rewrite is used by both the general code and the next optimization.
	var dflt *ir.CommClause
	for _, cas := range cases {
		ir.SetPos(cas)
		n := cas.Comm
		if n == nil {
			dflt = cas
			continue
		}
		switch n.Op() {
		case ir.OSEND:
			n := n.(*ir.SendStmt)
			n.Value = typecheck.NodAddr(n.Value)
			n.Value = typecheck.Expr(n.Value)

		case ir.OSELRECV2:
			n := n.(*ir.AssignListStmt)
			if !ir.IsBlank(n.Lhs[0]) {
				n.Lhs[0] = typecheck.NodAddr(n.Lhs[0])
				n.Lhs[0] = typecheck.Expr(n.Lhs[0])
			}
		}
	}
}

两个case(一个default)

代码示例

func main() {
	ch := make(chan int)
	select {
	case data := <-ch:
		fmt.Println("ch data:", data)
	default:
		fmt.Println("default")
	}
}

如果写入就走<- 读取,反之走默认

源码解读

如果是两个case,其中一个是default,非default的会根据send还是recv 调用channel的selectnbsend和 selectnbrecv。这两个方法是非阻塞的

func walkSelectCases(cases []*ir.CommClause) []ir.Node){
	// optimization: two-case select but one is default: single non-blocking op.
	if ncas == 2 && dflt != nil {
		cas := cases[0]
		if cas == dflt { // 如果是default 放在 cases[1]
			cas = cases[1]
		}

		n := cas.Comm
		ir.SetPos(n)
		r := ir.NewIfStmt(base.Pos, nil, nil, nil)
		r.SetInit(cas.Init())
		var cond ir.Node
		switch n.Op() {
		default:
			base.Fatalf("select %v", n.Op())

		case ir.OSEND:
            // 调用selectnbsend(c, v)
			// if selectnbsend(c, v) { body } else { default body }
			n := n.(*ir.SendStmt)
			ch := n.Chan
			cond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)

		case ir.OSELRECV2:
			n := n.(*ir.AssignListStmt)
			recv := n.Rhs[0].(*ir.UnaryExpr)
			ch := recv.X
			elem := n.Lhs[0]
			if ir.IsBlank(elem) { //空的话 elem= NodNil
				elem = typecheck.NodNil()
			}
			cond = typecheck.Temp(types.Types[types.TBOOL])
            // 调用 selectnbrecv
			fn := chanfn("selectnbrecv", 2, ch.Type())
			call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)
			as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})
			r.PtrInit().Append(typecheck.Stmt(as))
		}

		r.Cond = typecheck.Expr(cond)
		r.Body = cas.Body
		r.Else = append(dflt.Init(), dflt.Body...)
		return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}
	}    
}

每次尝试从channel读/写值,如果不成功则直接返回,不会阻塞。从selectnbsend和selectnbrecv看出,最后转换成if-else

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    // block:false
    // chan将 select准换if-else
	return chansend(c, elem, false, getcallerpc())
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selected, ok = selectnbrecv(&v, c); selected {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	// block:false
    // chan将 select准换if-else
    return chanrecv(c, elem, false)
}

多个case

代码示例

func main() {
	ch := make(chan int)
	go func() {
		tempArr := []int{1,2,3,4,5,6}
		for i := range tempArr {
			ch <- i
		}
	}()
	go func() {
		for {
			select {
			case i := <-ch:
				println("first: ", i)
			case i := <-ch:
				println("second", i)
			}
		}
	}()
	time.Sleep(3 * time.Second)

}

可以看到多个case,会随机选取一个case执行

源码解读

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
	ncas := len(cases)
	sellineno := base.Pos
	if dflt != nil {
		ncas--
	}
    // 定义casorder为ncas大小的case语句的数组
	casorder := make([]*ir.CommClause, ncas)
    // 分别定义nsends为发送channel的case个数,nrecvs为接收channel的case个数
	nsends, nrecvs := 0, 0
    // 多case编译后待执行的语句列表
	var init []ir.Node

	// generate sel-struct
	base.Pos = sellineno
    // 定义selv为长度为ncas的scase类型的数组
    // scasetype()函数返回的就是scase结构体,包含c和elem两个字段
	selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))
	init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))

	// No initialization for order; runtime.selectgo is responsible for that.
	// 定义order为2倍的ncas长度的TUINT16类型的数组
    // 注意:selv和order作为runtime.selectgo()函数的入参,前者存放scase列表内存地址,后者用来做scase排序使用,排序是为了便于挑选出待执行的case
    order := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))

	var pc0, pcs ir.Node
	if base.Flag.Race {
		pcs = typecheck.Temp(types.NewArray(types.Types[types.TUINTPTR], int64(ncas)))
		pc0 = typecheck.Expr(typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(0))))
	} else {
		pc0 = typecheck.NodNil()
	}

	// register cases 遍历case生成scase对象放到selv中
	for _, cas := range cases {
		ir.SetPos(cas)

		init = append(init, ir.TakeInit(cas)...)

		n := cas.Comm
		if n == nil { // default:
			continue
		}

		var i int
		var c, elem ir.Node
		switch n.Op() { // 根据类型获取chan, elem的值
		default:
			base.Fatalf("select %v", n.Op())
		case ir.OSEND: // 发送chan类型,i从0开始递增
			n := n.(*ir.SendStmt)
			i = nsends
			nsends++
			c = n.Chan
			elem = n.Value
		case ir.OSELRECV2: // 接收chan,i从ncas开始递减
			n := n.(*ir.AssignListStmt)
			nrecvs++
			i = ncas - nrecvs
			recv := n.Rhs[0].(*ir.UnaryExpr)
			c = recv.X
			elem = n.Lhs[0]
		}

		casorder[i] = cas
       // 定义一个函数,写入c或elem到selv数组
		setField := func(f string, val ir.Node) {
            // 放到selv数组
			r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)
            // 添加到带执行列表
			init = append(init, typecheck.Stmt(r))
		}

		c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])
		setField("c", c)
		if !ir.IsBlank(elem) {
			elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])
			setField("elem", elem)
		}

		// TODO(mdempsky): There should be a cleaner way to
		// handle this.
		if base.Flag.Race {
			r := mkcallstmt("selectsetpc", typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(int64(i)))))
			init = append(init, r)
		}
	}
    // 如果发送chan和接收chan的个数不等于ncas,直接报错
	if nsends+nrecvs != ncas {
		base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)
	}

	// run the select  开始执行select动作
	base.Pos = sellineno
    // 定义chosen, recvOK作为selectgo()函数的两个返回值
    // chosen 表示被选中的case的索引,recvOK表示对于接收操作,是否成功接收
	chosen := typecheck.Temp(types.Types[types.TINT])
	recvOK := typecheck.Temp(types.Types[types.TBOOL])
	r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)
	r.Lhs = []ir.Node{chosen, recvOK}
    // 调用runtime.selectgo()函数作为运行时实际执行多case的select动作的函数
	fn := typecheck.LookupRuntime("selectgo")
	var fnInit ir.Nodes
	r.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}
	init = append(init, fnInit...)
	init = append(init, typecheck.Stmt(r))

	// selv and order are no longer alive after selectgo.
    // 执行完selectgo()函数后,销毁selv和order数组.
	init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))
	init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))
	if base.Flag.Race {
		init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, pcs))
	}

	// dispatch cases 
    //定义一个函数,根据chosen确定的case分支生成if语句,执行该分支的语句
	dispatch := func(cond ir.Node, cas *ir.CommClause) {
		cond = typecheck.Expr(cond)
		cond = typecheck.DefaultLit(cond, nil)

		r := ir.NewIfStmt(base.Pos, cond, nil, nil)

		if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {
			n := n.(*ir.AssignListStmt)
			if !ir.IsBlank(n.Lhs[1]) {
				x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)
				r.Body.Append(typecheck.Stmt(x))
			}
		}

		r.Body.Append(cas.Body.Take()...)
		r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
		init = append(init, r)
	}
    // 如果多case中有default分支,并且chosen小于0,执行该default分支
	if dflt != nil {
		ir.SetPos(dflt)
		dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)
	}
    // 如果有chosen选中的case分支,即chosen等于i,则执行该分支
	for i, cas := range casorder {
		ir.SetPos(cas)
		dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)
	}

	return init
}

从上面代码可以看出:

1- 初始化过程: 生成scase数组,定义selv 存放scase数组内存地址,定义order 来给scase排序

2- 遍历所有的case ,将case放到带执行列表(不包括default)

3- 调用runtime。selectgo并将selv和order作为入参传入selectgo

4- 根据selectgo返回的chosen来生成if语句,执行对应的case

解锁加锁

加锁的顺序和解锁的顺序相反。


func sellock(scases []scase, lockorder []uint16) {
	var c *hchan
	for _, o := range lockorder {
		c0 := scases[o].c
		if c0 != c {
			c = c0
			lock(&c.lock)
		}
	}
}

func selunlock(scases []scase, lockorder []uint16) {
	// 我们必须非常小心,在解锁最后一把锁后不要触摸sel,因为sel可以在最后一次解锁后立即释放。
    //考虑以下情况。第一个M调用runtime·park()在runtime·selectgo()中传递sel。
    //一旦runtime·park()解锁了最后一个锁,另一个M会使调用select的G再次可运行,
    //并安排其执行。当G在另一个M上运行时,它锁定所有锁并释放sel。现在,如果第一个M触摸sel,它将访问释放的内存。
	for i := len(lockorder) - 1; i >= 0; i-- {
		c := scases[lockorder[i]].c
		if i > 0 && c == scases[lockorder[i-1]].c {
			continue // will unlock it on the next iteration
		}
		unlock(&c.lock)
	}
}

selectgo

selectgo 处理逻辑

// cas0指向[ncases]scase类型的数组,order0指向[2*ncases]uint16类型的数组(其中ncases必须<=65536)。
// 返回值有两个, chosen 和 recvOK,分别表示选中的case的序号,和对接收操作是否接收成功的布尔值
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
	if debugSelect {
		print("select: cas0=", cas0, "\n")
	}
    //==== 执行必要的初始化操作,并生成处理case的两种顺序:轮询顺序polIorder和加锁顺序lockorder。
  // 为了将scase分配到栈上,这里直接给cas1分配了64KB大小的数组,同理, 给order1分配了128KB大小的数组
	// NOTE: In order to maintain a lean stack size, the number of scases
	// is capped at 65536.
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
    // ncases个数 = 发送chan个数+ 接收chan个数
	ncases := nsends + nrecvs
    // scases是cas1数组的前ncases个元素
	scases := cas1[:ncases:ncases]
    // 顺序列表pollorder是order1的0- ncases个元素
	pollorder := order1[:ncases:ncases]
    // 加锁列表lockorder是order1的ncase到 2 ncases 个元素
	lockorder := order1[ncases:][:ncases:ncases]
	// NOTE: 编译器初始化的pollorder/lockorder的基础数组不是零。
	// Even when raceenabled is true, there might be select
	// statements in packages compiled without -race (e.g.,
	// ensureSigM in runtime/signal_unix.go).
	var pcs []uintptr
	if raceenabled && pc0 != nil {
		pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
		pcs = pc1[:ncases:ncases]
	}
	casePC := func(casi int) uintptr {
		if pcs == nil {
			return 0
		}
		return pcs[casi]
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

    // 生成排列顺序
    norder := 0
	for i := range scases {
		cas := &scases[i]

		// Omit cases without channels from the poll and lock orders.
        // 处理case中channel为空的情况
		if cas.c == nil {
			cas.elem = nil // 便于GC
			continue
		}
       // 通过fastrandn函数引入随机性,确定pollorder列表中case的随机顺序索引
		j := fastrandn(uint32(norder + 1))
		pollorder[norder] = pollorder[j]
		pollorder[j] = uint16(i)
		norder++
	}
	// 重新生成列表
	pollorder = pollorder[:norder]
	lockorder = lockorder[:norder]

    // 根据chan地址确定lockorder加锁排序列表的顺序
    // 简单的堆排序,以保证nlogn时间复杂度完成排序
	for i := range lockorder {
		j := i
        // 从轮询顺序开始,在同一channel上排序。
		c := scases[pollorder[i]].c
		for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
			k := (j - 1) / 2
			lockorder[j] = lockorder[k]
			j = k
		}
		lockorder[j] = pollorder[i]
	}
	for i := len(lockorder) - 1; i >= 0; i-- {
		o := lockorder[i]
		c := scases[o].c
		lockorder[i] = lockorder[0]
		j := 0
		for {
			k := j*2 + 1
			if k >= i {
				break
			}
			if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
				k++
			}
			if c.sortkey() < scases[lockorder[k]].c.sortkey() {
				lockorder[j] = lockorder[k]
				j = k
				continue
			}
			break
		}
		lockorder[j] = o
	}

	if debugSelect {
		for i := 0; i+1 < len(lockorder); i++ {
			if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
				print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
				throw("select: broken sort")
			}
		}
	}

    // 锁定select中涉及的所有channel
	sellock(scases, lockorder)

	var (
		gp     *g
		sg     *sudog
		c      *hchan
		k      *scase
		sglist *sudog
		sgnext *sudog
		qp     unsafe.Pointer
		nextp  **sudog
	)

	// === pass 1 - 查找可以等待处理的channel
	var casi int
	var cas *scase
	var caseSuccess bool
	var caseReleaseTime int64 = -1
	var recvOK bool
	for _, casei := range pollorder {
		casi = int(casei) // case的索引
		cas = &scases[casi]  
		c = cas.c
		if casi >= nsends { // 处理接收channel的case
			sg = c.sendq.dequeue()
			if sg != nil {
                // 如果当前channel的sendq上有等待的goroutine,
                // 跳到recv代码 并从缓冲区读取数据后将等待goroutine中的数据放入到缓冲区中相同的位置
				goto recv
			}
			if c.qcount > 0 {
                //如果当前channel的缓冲区不为空,就会跳到bufrecv标签处从缓冲区获取数据;
				goto bufrecv
			}
			if c.closed != 0 {
                //如果当前channel已经被关闭,就会跳到rclose读取末尾数据和收尾工作;
				goto rclose
			}
		} else { // 处理发送channel的case
			if raceenabled {
				racereadpc(c.raceaddr(), casePC(casi), chansendpc)
			}
			if c.closed != 0 {
                // 如果当前channel已经被关闭就会直接跳到sclose标签(panic中止程序)
				goto sclose
			}
			sg = c.recvq.dequeue()
			if sg != nil {
                // 如果当前channel的recvq上有等待的goroutine,就会跳到 send标签向channel发送数据;
				goto send
			}
			if c.qcount < c.dataqsiz {
                // 如果当前channel的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;
				goto bufsend
			}
		}
	}

	if !block { // 如果是非阻塞,即包含default分支,解锁所有channel并返回
		selunlock(scases, lockorder)
		casi = -1
		goto retc
	}

	// === pass 2 - 将当前goroutine根据需要挂在chan的sendq或recvq上
	gp = getg() // 获取当前的groutine
	if gp.waiting != nil {
		throw("gp.waiting != nil")
	}
	nextp = &gp.waiting // 正在等待的sudog结构;按锁定顺序
	for _, casei := range lockorder {
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c
		sg := acquireSudog()
        // 获取sudog,将当前goroutine绑定到sudog上
		sg.g = gp
		sg.isSelect = true
        // 在分配elem和在gp.waiting上排队sg之间没有堆栈分割,copystack可以找到它。
		sg.elem = cas.elem
		sg.releasetime = 0
		if t0 != 0 {
			sg.releasetime = -1
		}
		sg.c = c
        // 按锁定顺序构建waiting list 。
		*nextp = sg
		nextp = &sg.waitlink
        // 加入相应等待队列
		if casi < nsends {
			c.sendq.enqueue(sg)
		} else {
			c.recvq.enqueue(sg)
		}
	}

	// 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nil
	gp.param = nil
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
    // 挂起当前goroutine
	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	gp.activeStackChans = false
    // 加锁所有的channel
	sellock(scases, lockorder)

	gp.selectDone = 0
	sg = (*sudog)(gp.param)
     // param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nil
	gp.param = nil

	// === pass 3 - 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理
    //dequeue from unsuccessful chans
	// otherwise they stack up on quiet channels
	// record the successful case, if any.
	// We singly-linked up the SudoGs in lock order.
    // 从不成功的通道中退出队列,否则它们会堆积在安静的通道上,记录成功的案例(如果有的话)。我们单独将SudoG按锁定顺序连接起来。


	casi = -1
	cas = nil
	caseSuccess = false
    // 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudog
	sglist = gp.waiting
    // 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GC
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
		sg1.isSelect = false
		sg1.elem = nil
		sg1.c = nil
	}
    // 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了
	gp.waiting = nil

	for _, casei := range lockorder {
		k = &scases[casei]
        // 如果相等说明,goroutine是被当前case的channel收发操作唤醒的
		if sg == sglist {
            // sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队
			casi = int(casei)
			cas = k
			caseSuccess = sglist.success
			if sglist.releasetime > 0 {
				caseReleaseTime = sglist.releasetime
			}
		} else {
            // 不是此case唤醒当前goroutine, 将goroutine从case对应的队列(发送或接收)出队
			c = k.c
			if int(casei) < nsends {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
         // 释放当前case的sudog,然后处理下一个case的sudog
		sgnext = sglist.waitlink
		sglist.waitlink = nil
		releaseSudog(sglist)
		sglist = sgnext
	}

	if cas == nil {
		throw("selectgo: bad wakeup")
	}

	c = cas.c

	if debugSelect {
		print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
	}

	if casi < nsends {
		if !caseSuccess {
			goto sclose
		}
	} else {
		recvOK = caseSuccess
	}

	if raceenabled {
		if casi < nsends {
			raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
		} else if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
	}
	if msanenabled {
		if casi < nsends {
			msanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			msanwrite(cas.elem, c.elemtype.size)
		}
	}
	if asanenabled {
		if casi < nsends {
			asanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			asanwrite(cas.elem, c.elemtype.size)
		}
	}

	selunlock(scases, lockorder)
	goto retc

bufrecv:
    // 能从buffer获取数据
	if raceenabled {
		if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
		racenotify(c, c.recvx, nil)
	}
	if msanenabled && cas.elem != nil {
		msanwrite(cas.elem, c.elemtype.size)
	}
	if asanenabled && cas.elem != nil {
		asanwrite(cas.elem, c.elemtype.size)
	}
	recvOK = true
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	// 发送数据到缓存
	if raceenabled {
		racenotify(c, c.sendx, nil)
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	if asanenabled {
		asanread(cas.elem, c.elemtype.size)
	}
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc

recv:
    // 从休眠sender(sg)接收
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncrecv: cas0=", cas0, " c=", c, "\n")
	}
	recvOK = true
	goto retc

rclose:
    // 读取结束的channel
	selunlock(scases, lockorder)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	if raceenabled {
		raceacquire(c.raceaddr())
	}
	goto retc

send:
	// 想休眠的接收房发送数据
	if raceenabled {
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	if asanenabled {
		asanread(cas.elem, c.elemtype.size)
	}
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncsend: cas0=", cas0, " c=", c, "\n")
	}
	goto retc

retc:
	if caseReleaseTime > 0 {
		blockevent(caseReleaseTime-t0, 1)
	}
	return casi, recvOK

sclose:
	// 向关闭的channel发送数据
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))
}

总结

简单总结下select对case处理逻辑:

1- 空的case 会被golang监听到无法唤醒的协程,会panic

2- 如果只有一个case, 根据操作类型转换成 <- ch 或 成ch <- () (会跳用channel 的 chansend , chanrecv)

3- 如果一个default 一个非default 的case,非default会走 selectnbsend 和 selectnbrecv 非阻塞的方法(最后转换成if-else 语句)

4- 多个case 的情况下, cmd/compile/internal/walk/select.go 优化程序中:

4.1 对 scase 数组, selv ,order数组初始化,将case放在带执行列表中

4.2 调用selectgo函数,根据返回的chosen 结果来生成if语句,执行对应的case

selectgo 函数:

1- 随机生成一个便利case 的 轮询 poollorder, 根据channel 地址生成一个枷锁顺序的lockorder。(随机顺序保证公平性,加锁顺序能够避免思索)

2- 根据pollorder顺序查找cases是否包含立即处理的chan, 如果有就处理。没有处理的话,创建 sudo 结构,将当前的G 加入各case的channel 对应的 接收发送队列,等待其他G唤醒

3- 当调度器 唤醒当前的G,会按照lockorder ,访问所有的case。从中找到需要处理的case进行读写处理,同时从所有的case 的发送姐搜队列中移除当前的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1372197.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

UCF101 数据集介绍与下载

一、介绍 UCF101 是一个现实动作视频的动作识别数据集&#xff0c;收集自YouTube&#xff0c;提供了来自101个动作类别的13320个视频。官方&#xff1a;https://www.crcv.ucf.edu/research/data-sets/ucf101/ 数据集名称&#xff1a;UCF-101&#xff08;2012&#xff09; 总视…

mapboxGL中区域掩膜的实现

概述 区域掩膜的功能也是比较常见的功能呢&#xff0c;本文分享在mapboxGL中结合canvas如何实现。 效果 实现 1. 创建画布 先创建一个map大小的canvas&#xff0c;设置其大小与样式&#xff0c;并添加到地图画布容器中。 const {width, height} map.getCanvas() canvas …

Vue入门三(表单控制|购物车案例|v-model进阶|与后端交互|计算属性|监听属性|Vue生命周期)

文章目录 一、表单控制二、购物车案例三、v-model进阶四、与后端交互跨域问题解决&#xff0c;三种交互方法跨域问题详解1-CORS&#xff1a;后端代码控制&#xff0c;上面案例采用的方式1) 方式一&#xff1a;后端添加请求头2) 方式二&#xff1a;编写中间件3) 方式三&#xff…

什么是线程?

线程 1. 线程概述 线程是计算机科学中的基本概念&#xff0c;指的是在一个进程中执行的独立指令流。通常&#xff0c;一个进程可以包含多个线程&#xff0c;它们共享进程的资源&#xff0c;如内存空间、文件句柄等&#xff0c;但每个线程有自己的独立执行流。线程是操作系统进…

Python图片格式转换与文字识别:技术与实践

目录 一、引言 二、Python图片格式转换 PIL库介绍 代码示例 质量优化 三、文字识别技术 四、Python实现文字识别 1、安装与配置OCR工具 2. 读取图片并提取文字 3. 优化与提高识别准确率 五、实践与应用案例 六、结论 一、引言 随着数字化时代的到来&#xff0c;图…

SOLIDWORKS 2024新功能之SOLIDWORKS PDM篇

SOLIDWORKS 2024 新功能 PDM篇目录概述 • 装配体直观 • 在 Web2 中下载文件的特定版本 • 文件类型图标 • “更改状态”命令中的签出选项 • 复制树对话框 • 查看检出事件详细信息 • 系统变量 • 查看许可证使用 • 数据安全增强功能 • SOLIDWORKS PDM 性能改进…

软件架构之事件驱动架构

一、定义 事件驱动的架构是围绕事件的发布、捕获、处理和存储&#xff08;或持久化&#xff09;而构建的集成模型。 某个应用或服务执行一项操作或经历另一个应用或服务可能想知道的更改时&#xff0c;就会发布一个事件&#xff08;也就是对该操作或更改的记录&#xff09;&am…

msckf_vio在ubuntu20.04中的编译

1.新建catkin workspace文件夹&#xff0c;并在其中新建src文件夹&#xff0c;并将源码clone至src内。 源码地址&#xff1a;https://github.com/KumarRobotics/msckf_vio 目录层级示意如下&#xff0c;build和devel不必新建&#xff0c;后续指令会自动新建。 2. 在编译之前…

虹科技术丨PCAN网关设备:打通通信壁垒,LED指示灯编程示例

来源&#xff1a;虹科汽车智能互联 虹科技术丨PCAN网关设备&#xff1a;打通通信壁垒&#xff0c;LED指示灯编程示例 原文链接&#xff1a;https://mp.weixin.qq.com/s/hpxssnDeD-43x3tyHJbAtA 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; 导读 在工业自动化、汽…

python 文本内容随机生成器

这段代码是一个用于生成指定长度的随机文本的函数。主要包括两个函数&#xff1a;generate_text()和generate_other_content()。 generate_text(original_text, length)函数接受两个参数&#xff1a;原始文本和生成文本的长度。该函数的作用是根据原始文本生成指定长度的文本。…

竞赛保研 基于深度学习的人脸识别系统

前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于深度学习的人脸识别系统 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng-senior/…

【SpringCloud】之网关应用(进阶使用)

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是君易--鑨&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的博客专栏《SpringCloud开发之网关应用》。&#x1f3af;&a…

基于ChatGPT4+Python近红外光谱数据分析及机器学习与深度学习建模

022年11月30日&#xff0c;可能将成为一个改变人类历史的日子——美国人工智能开发机构OpenAI推出了聊天机器人ChatGPT3.5&#xff0c;将人工智能的发展推向了一个新的高度。2023年4月&#xff0c;更强版本的ChatGPT4.0上线&#xff0c;文本、语音、图像等多模态交互方式使其在…

python——数字精度控制

num1 11 num2 11.345 print("数字11宽度限制为5&#xff0c;结果%5d" % num1) print("数字11宽度限制为1&#xff0c;结果%1d" % num1) print("数字11.345宽度限制为7&#xff0c;小数精度为2结果%7.2f" % num2) print("数字11.345不限制…

课堂纪律差如何整治

在教育的世界里&#xff0c;有时候课堂纪律会成为一种挑战。那些在教室里大声喧哗、无视规则的学生&#xff0c;常常让老师们头疼不已。那么&#xff0c;面对课堂纪律差的问题&#xff0c;我们应该如何有效整治呢&#xff1f;下面就让我来为你揭晓这个问题的答案。 一、建立明确…

比特币惊现“天地针”!ETF终局将至,美证监会账号被盗!谁该对市场波动负责?

就在投资者神经紧绷时刻&#xff0c;万众期待的ETF批准事件再次闹出“假新闻”大乌龙&#xff0c;而这次的主角竟是美证监会。 美国东部时间周二下午4:11&#xff0c;美国证券交易委员会&#xff08;SEC&#xff09;官方X账户发布帖子称&#xff1a;“今天&#xff0c;美国证券…

数据结构之单调栈、单调队列

今天学习了单调栈还有单调队列的概念和使用&#xff0c;接下来我将对其定义并配合几道习题进行讲解&#xff1a; 首先先来复习一下栈与队列&#xff1a; 然后我们来看一下单调栈的定义&#xff1a; 单调栈中的元素从栈底到栈顶的元素的大小是按照单调递增或者单调递减的关系进…

九州金榜|厌学原因孩子情绪不稳定

孩子厌学是每个家长都不愿因看到&#xff0c;因为厌学会对孩子学习造成极大的影响&#xff0c;对于学习成绩下降这是必然的结果&#xff0c;所以&#xff0c;当孩子出现厌学情绪的时候&#xff0c;家长就会非常焦虑&#xff0c;但是对于孩子为什么会厌学&#xff0c;家长并不知…

烟火检测/区域人流统计/AI智能分析网关V4如何配置通道?

TSINGSEE青犀智能分析网关&#xff08;V4版&#xff09;是一款高性能、低功耗的软硬一体AI边缘计算硬件设备&#xff0c;硬件内部署了近40种AI算法模型&#xff0c;支持对接入的视频图像进行人、车、物、行为等实时检测分析&#xff0c;并上报识别结果&#xff0c;并能进行语音…

java SSM问卷调查系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM问卷调查管理系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代 码和数据库&#xff0c;系统主要采…