在大多数的编程语言中,main函数都是用户程序的入口函数,go中也是如此。那么main.main是整个程序的入口吗, 肯定不是,因为go程序依赖于runtime,在程序的初始阶段需要初始化运行时,之后才会运行到用户的main函数,那么main.main是在哪里被调用的呢?接下来就从go程序的入口,再到go的GMP模型进行一个探究。
注意:本文使用的go sdk的版本为go1.20
文章目录
- 1.go程序的入口
- 2 GMP模型
- 2.1 GM模型
- 2.2 改进的GMP模型
- 2.3 相关数据结构
- 2.3.1 runtime.g
- g的状态:
- 2.3.2 runtime.m
- 2.3.3 runtime.p
- p的状态:
- 2.3.4 runtime.schedt
- 2.4 g0、m0
- 3 G的创建与退出
- 4 调度循环
- 4.1 runtime.schedule
- 4.2 runtime.findrunnable
- 4.3 runtime.execute、runtime.gogo
- 4.4 runtime.gopark、runtime.goready
- 4.5 work stealing和handoff机制
- 5 抢占式调度
- 5.1 异步抢占
- 6 系统监控线程sysmon
1.go程序的入口
1 首先,编写一个简单的go程序,并将其进行编译,在此使用linux系统:
package main
import "fmt"
func main() {
fmt.Println("hello,world")
}
编译:-N -l 用于阻止编译时进行优化和内联
go build -gcflags "-N -l" main.go
2 然后使用gdb来调试go程序:
首先,使用gdb加载支持调试go语言的脚本文件:
在shell中执行gdb
命令,然后执行source /usr/local/go/src/runtime/runtime-gdb.py
➜ RemoteWorking git:(master) ✗ gdb
(gdb) source /usr/local/go/src/runtime/runtime-gdb.py
3 调试程序:
gdb main
使用info files
来查看文件
可以看到程序的入口为0x45c020
, 在该处打上端点,可以看到入口为_rt0_amd64_linux
的函数它位于src/runtime/rt0_liunx_amd64.s
的汇编文件中:
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
JMP _rt0_amd64(SB)
而该函数又调用了_rt0_amd64
,在asm_amd64.s
文件中:
TEXT _rt0_amd64(SB),NOSPLIT,$-8
MOVQ 0(SP), DI // argc
LEAQ 8(SP), SI // argv
JMP runtime·rt0_go(SB)
进而又跳转到了rt0_go
中:
TEXT runtime·rt0_go(SB),NOSPLIT|TOPFRAME,$0
...
MOVL 24(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 32(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
CALL runtime·newproc(SB)
POPQ AX
// start this M
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
RET
...
// mainPC is a function value for runtime.main, to be passed to newproc.
// The reference to runtime.main is made via ABIInternal, since the
// actual function (not the ABI0 wrapper) is needed by newproc.
DATA runtime·mainPC+0(SB)/8,$runtime·main<ABIInternal>(SB)
GLOBL runtime·mainPC(SB),RODATA,$8
在rt0_go
中先是进行了一些初始化,比如runtime.osinit
, runtime.schedinit
。
然后将runtime.mainPC
的地址放入AX寄存器中,然后调用了runtime.newproc
,根据下面的注释可以知道,mainPC就是runtime.main
,而newproc
则是创建goroutine
的函数。
我们通常在程序中使用 go func()
来启动一个协程,这是在go语言提供的一个语法糖,在编译时它会被翻译为newproc
的调用。因此,下面的几行代码则是创建了runtime.main
的goroutine
,也就是主goroutine,主goroutine被创建后,只是被放入了当前p的本地队列,但是还没有得到运行。
接下来调用了runtime.mstart
, 这个函数是除了sysmon
线程以外的其它线程的入口函数,最终该函数会调用schedule
函数,在schedule
函数中调用findrunnable
函数来获取一个可运行的goroutine
,然后调用execute来执行,execute
对goroutine
对应的g
结构体中的字段进行一些设置,然后调用gogo
来切换协程栈,并切换协程,因此main goroutine
将会被调度执行。
如下图所示:
2 GMP模型
GMP模型是go语言goroutine的调度系统,调度是将goroutine调度到线程上执行的过程,而操作系统的调度器则负责将线程调度到CPU上运行。
2.1 GM模型
go语言早期的调度模型为GM
模型,G
代表goroutine
,而M
代表一个线程,goroutine
和线程的数量有多个,那么调度器的职责就是将m
个goroutine
调度到n
个线程上来运行。待调度的goroutine
处于一个全局的调度队列globrunq
中,每个线程需要从globrunq
中获取goroutine
来执行,那么多个线程同时访问全局队列,为了保证线程间的同步,需要加锁,那么就会导致锁争用较大,从而降低系统的效率。
而且一个goroutine创建的goroutine也会被放入全局队列中,同时也需要加锁。这样也会造成程序的局部性较差,因为一个goroutine创建的另一个goroutine大概率不会在同一个线程上运行。
2.2 改进的GMP模型
为了改进之前的缺点:1 所有线程都从全局队列获取goroutine,造成锁争用强度大。2. 程序的局部性较差
go语言引入了GMP模型,G同样代表一个goroutine,M代表machine,也就是worker thread,p代表processor,包含了运行go代码所需的资源。
官方解释:
// Goroutine scheduler
// The scheduler's job is to distribute ready-to-run goroutines over worker threads.
//
// The main concepts are:
// G - goroutine.
// M - worker thread, or machine.
// P - processor, a resource that is required to execute Go code.
// M must have an associated P to execute Go code, however it can be
// blocked or in a syscall w/o an associated P.
线程是goroutine
运行的载体,goroutine
必须要在线程上运行。而一个线程想要运行goroutine
,就需要和一个p
进行关联,在每个p
中都包含了一个本地runq
,其中存放待运行的goroutine
,线程可以从本地runq中无锁访问,减少了锁竞争的力度。本地runq
的大小是有限的,最多可以存放256
个goroutine
。除此之外,还存在一个全局的globrunq
,当创建goroutine
时,优先放入相关联的p
的本地runq
,当本地runq
满了之后,新创建的goroutine
就会被添加到全局globrunq中。
p的数量
:p代表了一个逻辑处理器,p
的数量一般与CPU
的核心数相同,代表了可以并行运行的goroutine
的数量,可以通过runtime.GOMAXPROC
来设置。m的数量
:m表示一个线程,m的数量是不确定的,最大数量为10000个,但是正常情况下达不到这么大的数量。
2.3 相关数据结构
2.3.1 runtime.g
goroutine在runtime中表示为一个g结构体:
type g struct {
stack stack // offset known to runtime/cgo
stackguard0 uintptr // offset known to liblink
...
m *m // current m; offset known to arm liblink
sched gobuf
...
atomicstatus atomic.Uint32
goid uint64
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
}
type stack struct {
lo uintptr
hi uintptr
}
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
...
}
省略了一些不太关心的字段,其中的一些字段的含义如下:
字段 | 用途 |
---|---|
stack | goroutine的栈空间,表示栈空间的一个界限 |
stackguard0 | 栈的上限,它的值一般是stack.lo+StackGuard,用于判断是否需要栈增长。由于goroutine的栈在初始化只有2K,并且是可以动态增长的,因此在函数调用时会判断栈空间是否够用,如果不够用会进行扩容。同时该字段可能会被设置为StackPreempt来表示抢占当前goroutine。 |
m | 关联到当前正在运行goroutine的m |
sched | 保存gouroutine的执行上下文,比如栈指针sp,程序计数器pc |
atomicstatus | 一个原子变量,表示goroutine的当前状态 |
goid | goroutine的id,goroutine的id由p进行分配,p会从全局缓存处取一批id缓存起来 |
preempt | 抢占标识,为true时,调度器会在合适的时机触发一次抢占 |
goroutine是一个有栈协程,stack字段用于描述协程的栈,goroutine的初始栈大小为2K,并且是从堆中分配的,是可以动态增长的。
sched用来存储goroutine执行的上下文,它与goroutine切换的底层实现相关,其中sp标识stack pointer,pc为program counter,g用来反向关联到当前g。
g的状态:
atomicstatus字段表示goroutine的状态,goroutine有多种状态:
状态 | 含义 |
---|---|
_Gidle | 当前goroutine刚被分配,还没有被初始化 |
_Grunnable | 当前goroutine处于待运行状态,他可能处于p的本地runq或者globrunq中,当前并没有在运行用户代码,它的栈也不归自己所有。 |
_Grunning | 当前goroutine正在运行用户代码,有关联的M和P。不会处于任何runq中,栈归该goroutine所有。 |
_Gsyscall | 当前goroutine正在执行系统调用,并没有在执行用户代码,拥有栈,而且被分配了M。 |
_Gwaiting | 当前goroutine处于阻塞状态,即不再runq中,也没有得到运行。它肯定被记录在某个地方,比如chan的阻塞队列、mutex的阻塞队列中。 |
_Gdead | 当前goroutine没有在使用,可能存在一个free list中或者刚刚被初始化。 |
_Gcopystack | 当前goroutine的栈正在被移动,没有在执行用户代码也不在一个runq中。 |
2.3.2 runtime.m
GMP中的M代表一个工作线程,在runtime中使用m结构体来表示:
type m struct {
g0 *g // goroutine with scheduling stack
gsignal *g // signal-handling g
curg *g // current running goroutine
p puintptr // attached p for executing go code (nil if not executing go code)
id int64
preemptoff string // if != "", keep curg running on this m
locks int32
spinning bool // m is out of work and is actively looking for work
mOS
}
省略了其中一些不太关心的字段,其中一些字段的含义如下:
字段 | 用途 |
---|---|
g0 | 每个工作线程都拥有一个g0,它的栈比普通线程的栈要大,是分配在线程栈上的。g0主要用来运行调度器代码,当需要调度新的协程运行时,就会切换到g0栈上来运行调度程序。 |
gsignal | 用来处理操作系统信号的goroutine |
curg | 指向当前正在运行的g |
p | 关联到的p |
id | 线程的唯一ID |
preemptoff | 不为空时表示要关闭对curg的抢占,字符串的内容给出了相关的原因 |
locks | 当前M持有锁的数量 |
spining | 表示当前线程处于自旋状态 |
mOS | 平台相关的线程 |
2.3.3 runtime.p
GMP中的p代表processor,其中包含了一系列用于运行goroutine的资源,比如本地runq、堆内存缓存、栈内存缓存、goroutine id缓存等,在runtime中使用p结构体表示:
type p struct {
id int32
status uint32 // one of pidle/prunning/...
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
// Available G's (status == Gdead)
gFree struct {
gList
n int32
}
// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
preempt bool
}
其中省略了一些不太关心的字段,一些字段的含义如下:
字段 | 用途 |
---|---|
id | p的唯一ID,等于在allp数组中的下标 |
status | 表示p的状态 |
schedtick | 记录了调度发生的次数,每调度一次goroutine并且不继承时间片的情况下,将该字段加1 |
syscalltick | 记录发生系统调用的次数 |
sysmontick | 被监控线程用来存储上一次检查时的调度器时钟滴答,用以实现时间片算法 |
m | 当前关联的m |
goidcache、goidcacheend | goroutine id缓存,会从全局缓存中申请一批来减少锁争用 |
runqhead、runqtail、runq | 本地goroutine运行队列,使用一个数组和一头一尾组成一个环形队列 |
runnext | 如果不为nil,则指向一个被当前G准备好的就绪的G,接下来会继承当前G的时间片开始运行。 |
gFree | 用来缓存已经推出的g,方便下次申请时复用 |
preempt | 该字段用于支持异步抢占机制 |
p的状态:
状态 | 含义 |
---|---|
_Pidle | 当前p处于空闲状态,没有被用于执行用户代码或调度。p处于idle list中,它的本地runq是空的 |
_Prunning | 当前p与一个m进行关联并且被用于执行用户代码或者调度 |
_Psyscall | 当前p没有在运行用于代码,它与系统调用中的M有亲和关系,但不属于它,并且可能被另一个M窃取。这类似于_Pidle,但使用轻量级转换并维护M亲和关系。 |
_Pgcstop | 当前p因为STW而停止 |
_Pdead | 停用状态,因为GOMAXPROC可用收缩,会造成多余的p被停用。一旦GOMAXPROC重新增长,那么停用的p会被重新启用。 |
2.3.4 runtime.schedt
还有另一个和调度相关的数据结构需要关注,就是runtime.schedt,其中包含了调度的一些全局数据,schedt类型的实例只会存在一个:
var (
allm *m // 所有m组成一个链表
gomaxprocs int32 // 对应与GOMAXPROC
ncpu int32 // CPU核心数
sched schedt // 调度器相关的数据结构
allpLock mutex // 保护allp的锁
allp []*p // 所有的p
)
schedt结构如下:
其中全局runq就存在与schedt结构中
type schedt struct {
goidgen atomic.Uint64
midle muintptr // idle m's waiting for work
nmidle int32 // number of idle m's waiting for work
mnext int64 // number of m's that have been created and next M ID
maxmcount int32 // maximum number of m's allowed (or die)
nmsys int32 // number of system m's not counted for deadlock
nmfreed int64 // cumulative number of freed m's
ngsys atomic.Int32 // number of system goroutines
pidle puintptr // idle p's
npidle atomic.Int32
nmspinning atomic.Int32 // See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue.
runq gQueue
runqsize int32
// Global cache of dead G's.
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
}
字段 | 用途 |
---|---|
goidgen | 全局goid的分配器,以保证goid的唯一性。P中的goidcache就是从这里批量获取的。 |
midle | 空闲M链表的链表头 |
nmidle | 空闲M的数量 |
mnext | 记录共创建了多少个M,同时也被用于下一个M的ID |
maxmcount | 允许创建的M的最大数量 |
nmsys | 系统M的数量 |
nmfreed | 统计已经释放的M的数量 |
ngsys | 系统goroutine的数量 |
pidle | 空闲P链表的表头 |
npidle | 空闲P的数量 |
nmspining | 处于自旋状态的M的数量 |
qunq、runqsize | 全局就绪goroutine队列,需要加锁访问 |
gFree | 用来缓存已经退出的g |
2.4 g0、m0
在每个工作线程M中都存在一个g0,g0的主要功能就是执行调度程序,当需要执行调度程序时会将运行栈切换的g0栈,然后运行调度程序来寻找一个就绪的goroutine并切换运行。
有两个函数可用切换到g0栈来运行:
func mcall(fn func(*g))
func systemstack(fn func())
- mcall:将调用mcall的协程栈切换的g0栈并且在g0栈上运行fn,mcall仅可以被除了g0、gsingal之外的g调用。
- systemstack:在系统栈上运行fn,然后再切换回来。
m0为进程的第一个线程,也就是运行main goroutine的线程
3 G的创建与退出
我们再程序中通常使用下面的方式来创建一个goroutine:
go func()
这只是go语言为我们提供的一个语法糖,事实上,在编译时,该方式会被翻译为对runtime.newproc
函数的调用,newproc
用于创建一个新的goroutine
,并将其添加到就绪队列中。
我们可用通过将代码编译为汇编来查看go func()是怎么执行的:
将下面的示例代码编译为汇编程序:
go build -gcflags -S main.go
package main
import (
"fmt"
"time"
)
func print() {
fmt.Println("hello, GMP")
time.Sleep(time.Second)
}
func main() {
go print()
select {}
}
汇编代码如下:
"".main STEXT size=50 args=0x0 locals=0x10 funcid=0x0 align=0x0
0x0000 00000 (/root/RemoteWorking/main.go:13) TEXT "".main(SB), ABIInternal, $16-0
0x0000 00000 (/root/RemoteWorking/main.go:13) CMPQ SP, 16(R14)
0x0004 00004 (/root/RemoteWorking/main.go:13) PCDATA $0, $-2
0x0004 00004 (/root/RemoteWorking/main.go:13) JLS 43
0x0006 00006 (/root/RemoteWorking/main.go:13) PCDATA $0, $-1
0x0006 00006 (/root/RemoteWorking/main.go:13) SUBQ $16, SP
0x000a 00010 (/root/RemoteWorking/main.go:13) MOVQ BP, 8(SP)
0x000f 00015 (/root/RemoteWorking/main.go:13) LEAQ 8(SP), BP
0x0014 00020 (/root/RemoteWorking/main.go:13) FUNCDATA $0, gclocals·33cdeccccebe80329f1fdbee7f5874cb(SB)
0x0014 00020 (/root/RemoteWorking/main.go:13) FUNCDATA $1, gclocals·33cdeccccebe80329f1fdbee7f5874cb(SB)
0x0014 00020 (/root/RemoteWorking/main.go:14) LEAQ "".print·f(SB), AX
0x001b 00027 (/root/RemoteWorking/main.go:14) PCDATA $1, $0
0x001b 00027 (/root/RemoteWorking/main.go:14) NOP
0x0020 00032 (/root/RemoteWorking/main.go:14) CALL runtime.newproc(SB)
可以看到,print函数的地址被保存在了AX
寄存器中,然后调用了runtime.newproc
函数
runtime.newproc代码如下
:
func newproc(fn *funcval) {
gp := getg() // 获取当前g
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc) // 创建一个新的goroutine
pp := getg().m.p.ptr() // 获取当前g运行的m关联的p
runqput(pp, newg, true) // 将新的goroutine加入到就绪队列中
if mainStarted {
wakep() // 唤醒新的p
}
})
}
runqput
会优先将新创建的goroutine
放入当前p
的runnext
中。如果runnext
已经有goroutine
了,则会将旧的goroutine
放入本地队列中,如果本地队列满了,那么则会将旧的goroutine
以及本地队列一半的goroutine
放入全局队列中。
newproc的主要逻辑就是创建了一个新的g,并将其放入当前g运行的m关联的p的本地runq中。
newproc1的代码如下:
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
...
mp := acquirem() // 禁止抢占
pp := mp.p.ptr() // 获取当前m关联的p
newg := gfget(pp) // 从p的缓存中获取一个g
if newg == nil {
newg = malg(_StackMin) // 如果从缓存中获取不到,则新创建一个,_StackMin的值为2048,也就是2K
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
...
// goexit函数被放在了pc上,gostartcallfn会对其进行特殊处理
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
...
casgstatus(newg, _Gdead, _Grunnable) // 改变g的状态
newg.goid = pp.goidcache // 分配goid
pp.goidcache++
releasem(mp)
return newg
}
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
if fv != nil {
fn = unsafe.Pointer(fv.fn)
} else {
fn = unsafe.Pointer(abi.FuncPCABIInternal(nilfunc))
}
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
sp := buf.sp
sp -= goarch.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc // 在goroutine的栈帧中插入了goexit函数
buf.sp = sp
buf.pc = uintptr(fn)
buf.ctxt = ctxt
}
在newproc1中获取了一个g实例,对其中的字段进行了设置,为其分配id,并修改状态为*_Grunnable*。特别需要注意的时,在gostartcall函数中,往goroutine的栈帧中插入了一个goexit函数,因此当goroutine从运行的函数退出时,就会返回到goexit函数中。
使用goland调试go程序时,可以从调用栈中查看到runtime.goexit函数,仿佛是runtime.goexit函数调用了runtime.main,而runtime.main又调用了main.main函数
而runtime.goexit是一段使用汇编实现的代码:
TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
// traceback from goexit1 must hit code range of goexit
BYTE $0x90 // NOP
而其中又调用了runtime.goexit1
函数:
func goexit1() {
mcall(goexit0)
}
func goexit0(gp *g) {
// 重置g的状态
mp := getg().m
pp := mp.p.ptr()
casgstatus(gp, _Grunning, _Gdead)
gcController.addScannableStack(pp, -int64(gp.stack.hi-gp.stack.lo))
if isSystemGoroutine(gp, false) {
sched.ngsys.Add(-1)
}
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
mp.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = waitReasonZero
gp.param = nil
gp.labels = nil
gp.timer = nil
dropg() // 将当前g从m移除
gfput(pp, gp) //将g放入p的gFreelist中
schedule() // 触发新一轮的调度
}
从runtime.goexit到runtime.goexit1,最终到runtime.goexit0函数中,对g的状态进行了重置,然后将g从m中移除,放入p的gFree List中,,以便后续重用。然后调用了scheduler函数,scheduler函数正是调度的入口,如此一来便形成了一个闭环。
总结
:当我们使用go func()来启动一个goroutine时,实际上是调用了newproc函数来创建一个新的goroutine,然后将新的goroutine加入到当前m关联的p的本地队列中。后续,goroutine会得到调度运行,当goroutine运行结束后,会进入goexit函数中,对该g进行回收,然后调用schedule触发新一轮的调度。
4 调度循环
go的调度器会不断调度goroutine到线程上运行,当一个goroutine结束运行、发生阻塞、主动让出、或者时间片用尽时就会触发新一轮的调度,重新选择一个goroutine来运行。整个流程如下:
mstart
:mstart是工作线程的入口函数,最终会触发schedule来进行goroutine的调度schedule
:是调度循环的开始,寻找可运行的g,并调用execute运行execute
:对g的状态进行设置,调用gogo来切换goroutineuser code
:用户代码,在执行用户代码时有多种方式会触发重新调度,比如1.用户代码执行完毕,通过goexit退出到schedule函数中;2.用户代码发生阻塞(操作chan、获取锁、读取网络数据等),发生阻塞时会调用gopark来切换goroutine,最终也会进入schedule函数中;3.runtime.Gosched,用户主动调用Gosched让出当前goroutine的执行权。
4.1 runtime.schedule
工作线程通过schedule
函数来触发一次调度,该函数是调度逻辑的主要实现,schedule
函数会在g0
栈上运行。
代码如下:
func schedule() {
mp := getg().m
// 线程持有锁时,不能进行调度,以免造成runtime内部错误
if mp.locks != 0 {
throw("schedule: holding locks")
}
// 判断当前M有没有和G绑定,如果有,这个M就不能用来执行其它的G
if mp.lockedg != 0 {
stoplockedm()
execute(mp.lockedg.ptr(), false) // Never returns.
}
// 判断是否在进行cgo调用,如果在就不能进行调度,因为g0栈正在被cgo使用
if mp.incgo {
throw("schedule: in cgo")
}
top:
pp := mp.p.ptr() // 获取当前m关联的p
pp.preempt = false
if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
// 寻找一个可运行的g,阻塞直到找到
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
// 如果当前线程正在自旋寻找新的工作,因为已经找到工作了,重置自旋状态
if mp.spinning {
resetspinning()
}
...
// If about to schedule a not-normal goroutine (a GCworker or tracereader),
// wake a P if there is one.
if tryWakeP {
wakep()
}
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}
// 调用execute来运行g
execute(gp, inheritTime)
}
在schedule
中,首先是进行一些检测,比如在线程持有锁时,不能进行调度;如果当前线程一个G
进行了绑定,那么就不能用于调度其它的G
运行。同时判断是否在进行cgo
调用,在执行cgo
调用时会使用g0
栈,因此也不可以进行调度。
接下来就是调用findrunnable
函数来寻找一个可运行的g
,最终调用execute
来运行该g
。
4.2 runtime.findrunnable
findrunnable的主要逻辑就是寻找一个处于_Runnable
状态的goroutine
,
首先,如果启动了trace
,则会获取trace reader
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
mp := getg().m
// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.
...
// Try to schedule the trace reader.
if trace.enabled || trace.shutdown {
gp := traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
return gp, false, true
}
}
...
}
如果没有启动trace或者获取不到trace reader
,则会查询gc
的标记工作是否启动,如果启动了,则尝试获取一个GC Worker
来执行标记任务。
// Try to schedule a GC worker.
if gcBlackenEnabled != 0 {
gp, tnow := gcController.findRunnableGCWorker(pp, now) // 尝试获取一个GC Worker
if gp != nil {
return gp, false, true
}
now = tnow
}
为了保证两个goroutine
交替执行,从而导致全局队列中的goroutine
饥饿的问题,每进行61
次调度,就会从全局队列中取出一个goroutine
来运行,p.schedtick
记录了调度的次数。
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if pp.schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp := globrunqget(pp, 1) // 每执行61次调度,就从全局队列中获取一个goroutine来运行
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
如果还没获取到,则从p
的本地runq
中获取一个goroutine
。
// local runq
if gp, inheritTime := runqget(pp); gp != nil { // 从p的本地runq中获取
return gp, inheritTime, false
}
如果p
的本地runq
中也没有可运行的goroutine
,那么则会从全局队列中取出一批goroutine
。因为访问全局runq
需要加锁,因此会从中获取一批goroutine
并加入到p
的本地runq
中。
// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(pp, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
如果全局队列中也没有可运行的goroutine
,就会尝试使用netpoller
来轮询网络,从而获取可运行的goroutine
。
// 如果netpoller启动了,并且其中管理的fd数量大于0,调用netpoll来轮询网络,以此来获取在网络中就绪的goroutine
if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop() // 获取到一批goroutine,组成一个链表,获取链表头第一个
injectglist(&list) // 将其它goroutine放入本地runq中
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
}
如果上面的方法都无法获取一个待运行goroutine
,则会选择从其它P的本地runq
中偷取一批goroutine
。为了充分利用多核cpu
的并行性,并不会将当前线程挂起,而是尝试从其它P
那偷取工作,防止P
忙的忙死、闲的闲死。任务窃取会循环尝试四次,从allp
中选择一个P
,并从其中窃取工作,每次任务窃取都会随机选择一个下标开始窃取,以保证公平性。
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
if !mp.spinning {
mp.becomeSpinning()
}
gp, inheritTime, tnow, w, newWork := stealWork(now) // 从其它P那偷取工作
if gp != nil {
// Successfully stole.
return gp, inheritTime, false
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}
4.3 runtime.execute、runtime.gogo
在runtime.execute
中将要执行寻找到的g
,设置g
的状态,最后调用gogo
来切换goroutine
运行。gogo
是一段由汇编实现的代码,主要逻辑就是切换协程栈,恢复选中的goroutine
的执行。
func execute(gp *g, inheritTime bool) {
mp := getg().m
...
mp.curg = gp // 设置当前运行的g
gp.m = mp // 关联当前的m
casgstatus(gp, _Grunnable, _Grunning) // 将当前g的状态切换为_Grunning
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
mp.p.ptr().schedtick++
}
...
gogo(&gp.sched) // 调用gogo来切换协程
}
4.4 runtime.gopark、runtime.goready
gopark
将当前正在运行的goroutine
挂起(状态为_Gwaiting
),并触发新一轮的调度:
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
...
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
func park_m(gp *g) {
mp := getg().m
if trace.enabled {
traceGoPark(mp.waittraceev, mp.waittraceskip)
}
// 修改g的状态为Gwaiting
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := mp.waitunlockf; fn != nil {
ok := fn(gp, mp.waitlock)
mp.waitunlockf = nil
mp.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
// 触发新一轮的调度
schedule()
}
当goroutine
发生阻塞时,通常会调用gopark
来触发调度。比如,当读取一个空的chan
时,goroutine
就会被放入chan
的读阻塞队列中,然后调用gopark
来切换协程。
goready
通常用来唤醒一个协程,比如,当另一个协程往chan
中写入数据时,就需要负责唤醒读阻塞的协程。goready
通过systemstack
切换到g0
栈并运行ready
,在ready
中将goroutine
的状态切换为_Grunnable
并且添加到runq中
,该goroutine
后面便会得到调度执行。
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
status := readgstatus(gp)
// Mark runnable.
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// 将g的状态切换为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
// 将g添加到runq中
runqput(mp.p.ptr(), gp, next)
// 唤醒新的p
wakep()
releasem(mp)
}
4.5 work stealing和handoff机制
work stealing
当一个线程没有可用的工作并且从全局队列中也找不到时,该线程并不会立马陷入休眠或者被销毁,而且尝试从其它P中窃取一部分的工作来运行。
handoff
当一个goroutine处于系统调用时,可能会导致整个线程发生阻塞。为了充分利用多核CPU,当前P会与M进行解绑,并且寻找或创建一个新的M来运行工作。
5 抢占式调度
就像操作系统的调度器负责线程的调度一样,go
的调度器负责goroutine
的调度。现代操作系统调度器都是抢占式
的,基于经典的时间片
算法。当一个线程的时间片用完后,便会触发时钟中断
,调度器将其执行的上下文
进行保存,然后选择下一个线程,恢复其执行上下文
,分配新的时间片,令其开始执行。这种抢占式对于线程本身是无感知的,由操作系统提供支持。
基于时间片算法的调度有一个明显的优点,能够避免一个线程持续占有CPU
资源,从而使其它线程长期处于饥饿状态。goroutine
的调度也用到了时间片算法(每个goroutine 10ms
),但是和操作系统的调度还是有明显区别的。因为go
的调度程序完全工作在用户态
,无法使用时钟中断
这种方式来触发调度。也得益于用户态的实现,go
的调度器要更加轻量。
除了goroutine
退出、阻塞以及用户调用Gosched
主动让出的情况,go调度器的抢占式调度实际上是通过hook的方式来实现的
。由于goroutine
的栈比较小,因此在调用函数时,需要检测栈是否够用,如果不够用,则会触发栈增长。因此,go
程序在编译时,会在发生函数调用处插入栈检测的相关代码
,而在栈检测的代码中也包含了抢占调度
的逻辑。
编译器会在发生函数调用之前插入runtime.morestack
或runtime.morestack_noctxt
函数:
这两个函数是由汇编实现的,最终它们都会调用runtime.newstack
函数:
抢占相关的代码如下:
func newstack() {
...
// 加载 stackguard0
stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
// 判断stackguard0是否被标记为了抢占
preempt := stackguard0 == stackPreempt
if preempt {
...
// 触发抢占
gopreempt_m(gp) // never return
}
}
func gopreempt_m(gp *g) {
...
goschedImpl(gp)
}
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
// 触发新的一轮调度
schedule()
}
在newstack
中会判断stackguard0
是否被设置为了抢占标识,也就是stackPreempt
,值为0xfffffade
,是一个很大的数,正常的栈不会达到该值。
系统监控线程会检测goroutine
的运行时间,一旦一个goroutine
的运行时间超过了其分配的时间片,就会将当前goroutine
设置为抢占。因此当goroutine
发生函数调用前,会进入编译器插入的morestack
函数,然后进行抢占判断,如果该goroutine
被标识为抢占,则最终会进入schedule
函数触发一次新的调度。
这种方式似乎看起来很完美,但是存在一个问题。就是如果一个goroutine
不发生函数调用,也不会阻塞更不会主动让出,比如一个无限循环的计算任务,那么它就会一直占用CPU
,甚至会导致程序卡死。
5.1 异步抢占
在go1.13
版本前,只有上面的抢占方式,因此可能会导致一些问题,接下来做一个实验,需要的go版本为小于go1.13
。
package main
import "fmt"
func fn(i int) {
for {
i++
fmt.Println(i)
}
}
func main() {
go fn(0)
for {
}
}
程序启动后,会一直打印数字,但是在我机器上打印到15万多的时候就会停止,整个程序卡死了。
在fn函数中会调用fmt.Println
函数打印数组,而在该函数中会进行内存分配,当内存分配到一定量就会触发GC
。开始GC
前需要STW(Stop The World
)因此需要对所有的线程进行抢占,但是main goroutien没有发生函数调用,也就无法对其抢占,因此造成了死锁
。
在go1.14
版本后引入了异步抢占机制
,这样的事情就不会发生了。
那么只要有一种机制可以让没有发生函数调用的goroutine
可以被打断,跳转到一个函数中去,那么不就可以对其进行抢占了吗。
异步抢占的主要机制是基于操作系统信
号,当系统监控线程检测到一个goroutine
执行了过长的时间,就会发出异步抢占,也就是给该goroutine
所在线程发送一个信号
,线程收到信号后就会跳转到提前注册的信号处理函数sigHandle
r中,从而就可以对该goroutine
进行抢占了。
用作抢占的信号正是SIGURG
,这个信号很少使用,可以用来实现基于信号的异步抢占。
信号处理函数sigHandler
中抢占机制的代码如下:
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
...
// 如果当前信号是sigPreempt 而且 开启了异步抢占
if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
//进行抢占
doSigPreempt(gp, c)
}
...
}
doSigPreempt
会在goroutine
的栈中注入一个asyncPreempt
的调用,因此当信号处理函数返回,重新回到goroutine
中时就会执行asyncPreempt
函数
func doSigPreempt(gp *g, ctxt *sigctxt) {
...
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
...
}
asyncPreempt
是由汇编实现的,它又会调用asyncPreempt2
,最终又会回到schedule
中触发一次新的调度。
func asyncPreempt2() {
...
mcall(gopreempt_m)
...
}
func gopreempt_m(gp *g) {
...
goschedImpl(gp)
}
func goschedImpl(gp *g) {
...
schedule()
}
6 系统监控线程sysmon
系统监控线程sysmon(system mointor)
是一个特殊的线程,它会在go程序执行期间常驻,它不需要和P绑定就可以执行,它的工作主要是负责对系统的情况进行检测,并协调系统的运行。
系统监控线程会在runtime.main
执行时被创建:
sysmon
是这个线程的入口函数:
sysmon
的主要任务如下:
死锁检测
轮询网络
:当其他P都比较繁忙时,它会负责轮询网络,并将就绪的goroutine加入全局队列中夺取处于系统调用中的P
:当一个goroutine因为系统调用而阻塞时,sysmon会将线程绑定的P hand off出去,它可以寻找或创建一个新的M来继续运行抢占长时间运行的G
:当一个goroutine运行时间多长时,sysmon会将其标记为抢占。如果支持异步抢占,则会执行异步抢占周期性触发GC
:GC的时间周期为2min
func sysmon() {
lock(&sched.lock)
sched.nmsys++
// 1.死锁检测
checkdead()
unlock(&sched.lock)
...
for {
// 休眠一定时间
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
...
// poll network if not polled for more than 10ms
// 2.轮询网络如果截至上次已经超过10ms
lastpoll := sched.lastpoll.Load()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
sched.lastpoll.CompareAndSwap(lastpoll, now)
list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
...
// retake P's blocked in syscalls
// and preempt long running G's
// 3.夺回阻塞与系统调用中的P
// 4.抢占长时间运行的G
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// check if we need to force a GC
// 5.周期性触发GC
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && forcegc.idle.Load() {
lock(&forcegc.lock)
forcegc.idle.Store(false)
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
unlock(&sched.sysmonlock)
}
}