golang 调度过程源码分析


golang 版本:go version go1.15.2 darwin/amd64

根据启动函数来分析golang MPG的生存周期,忽略cgo相关代码。

MPG

MPG是golang调度的重要对象:

  • M,表示一个内核线程,是执行用户代码的实际场所
  • P,表示一个处理器,管理M需要运行G的相关资源,如内存分配,G的可执行列表,G的空闲列表等
  • G,表示一个goroutine,调度基本单元,维护goroutine内部资源,如栈信息,defer列表等

核心结构体

G (goroutine)

G 表示一个 goroutine,是调度的基本单元。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// runtime/runtime2.go

type g struct {
// 栈信息
stack stack // 栈边界 [stack.lo, stack.hi)
stackguard0 uintptr // 用于栈溢出检查
stackguard1 uintptr // 用于栈溢出检查

// 调度相关
_panic *_panic // panic 链表
_defer *_defer // defer 链表
m *m // 当前绑定的 m
sched gobuf // 调度上下文,保存 goroutine 的寄存器状态
syscallsp uintptr // 系统调用时的栈指针
syscallpc uintptr // 系统调用时的程序计数器
stktopsp uintptr // 期望的栈顶指针
param unsafe.Pointer // 唤醒时的参数
atomicstatus uint32 // goroutine 状态
stackLock uint32 // 栈锁
goid int64 // goroutine ID

// 抢占相关
preempt bool // 抢占标志
preemptStop bool // 抢占停止标志
preemptShrink bool // 抢占收缩标志

// 等待相关
waitsince int64 // 等待开始时间
waitreason waitReason // 等待原因
lockedm muintptr // 锁定的 m
sig uint32 // 信号

// 其他
gopc uintptr // 创建该 goroutine 的 PC
startpc uintptr // goroutine 函数入口
racectx uintptr
waiting *sudog // 等待队列
cgoCtxt []uintptr // cgo 回溯
labels unsafe.Pointer
timer *timer // 定时器
selectDone uint32 // select 完成标志
gcAssistBytes int64 // GC 辅助字节数
}

关键字段说明

  • stack: goroutine 的栈空间
  • sched: 保存 goroutine 的调度上下文(PC、SP、BP 等寄存器)
  • atomicstatus: goroutine 的状态(_Gidle, _Grunnable, _Grunning, _Gsyscall, _Gwaiting, _Gdead 等)
  • m: 当前执行该 goroutine 的 M
  • goid: goroutine 的唯一标识符

M (machine)

M 表示一个内核线程,是执行用户代码的实际场所。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// runtime/runtime2.go

type m struct {
g0 *g // 调度器使用的 goroutine,拥有较大的栈
curg *g // 当前正在执行的 goroutine
p puintptr // 当前绑定的 P
nextp puintptr // 下一个要绑定的 P
oldp puintptr // 系统调用前绑定的 P

// 线程 ID
id int64
mallocing int32
throwing int32
preemptoff string // 如果 != "", 保持 curg 在这个 m 上运行
locks int32 // 锁计数
dying int32
profilehz int32
spinning bool // m 是否在自旋
blocked bool // m 是否被阻塞
newSigstack bool // C 线程上的最小栈

// 系统调用相关
syscalltick uint32
syscallwhen int64
syscallsp uintptr

// 调度相关
mstartfn func() // m 启动时调用的函数
caughtsig guintptr // goroutine 运行在无效的栈上

// 等待相关
park note // 用于阻塞和唤醒
alllink *m // 所有 m 的链表
schedlink muintptr // 空闲 m 链表
lockedg guintptr // 锁定的 goroutine
createstack [32]uintptr // 创建栈的调用栈
lockedExt uint32 // 外部锁计数
lockedInt uint32 // 内部锁计数
nextwaitm muintptr // 下一个等待的 m
waitunlockf func(*g, unsafe.Pointer) bool
waitlock unsafe.Pointer
waittraceev byte
waittraceskip int
startingtrace bool
thread uintptr // 线程句柄
freelink *m // 空闲 m 链表
}

关键字段说明

  • g0: 调度器使用的 goroutine,拥有较大的栈空间,用于执行调度相关代码
  • curg: 当前正在执行的用户 goroutine
  • p: 当前绑定的 P,M 需要 P 才能执行 G
  • spinning: 表示 M 是否在自旋等待工作
  • park: 用于阻塞和唤醒 M

P (processor)

P 表示一个处理器,管理 M 需要运行 G 的相关资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// runtime/runtime2.go

type p struct {
id int32
status uint32 // _Pidle, _Prunning, _Psyscall, _Pgcstop, _Pdead
link puintptr
schedtick uint32 // 每次调度递增
syscalltick uint32 // 每次系统调用递增
sysmontick sysmontick // 最后一次 sysmon 观察到的 tick
m muintptr // 绑定的 m,如果空闲则为 nil
mcache *mcache // 内存分配器缓存
pcache pageCache // 页缓存
racectx uintptr

// 可执行队列
runqhead uint32 // 本地可执行队列头
runqtail uint32 // 本地可执行队列尾
runq [256]guintptr // 本地可执行队列
runnext guintptr // 下一个要执行的 G(高优先级)

// 空闲 G 列表
gFree struct {
gList
n int32
}

// 定时器
timersLock mutex
timers []*timer
numTimers uint32
adjustTimers uint32
deletedTimers uint32

// GC 相关
gcAssistTime int64 // 辅助 GC 时间
gcFractionalMarkTime int64
gcBgMarkWorker guintptr
gcMarkWorkerMode gcMarkWorkerMode

// 其他
preempt bool
pad cpu.CacheLinePad
}

关键字段说明

  • status: P 的状态(_Pidle, _Prunning, _Psyscall, _Pgcstop, _Pdead)
  • m: 当前绑定的 M
  • runq: 本地可执行队列,最多 256 个 G
  • runnext: 下一个要执行的 G(高优先级)
  • mcache: 内存分配器缓存,每个 P 都有独立的缓存
  • gFree: 空闲 G 列表,用于复用 G

schedt (scheduler)

全局调度器结构,管理所有 M、P、G 的全局状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// runtime/runtime2.go

type schedt struct {
// 全局可执行队列
lock mutex
runq gQueue // 全局可执行队列
runqsize int32 // 全局可执行队列大小

// M 相关
midle muintptr // 空闲 M 链表
nmidle int32 // 空闲 M 数量
nmidlelocked int32 // 锁定的空闲 M 数量
mnext int64 // 下一个 M ID
maxmcount int32 // M 的最大数量
nmsys int32 // 系统 M 数量
nmfreed int64 // 释放的 M 数量

// P 相关
pidle puintptr // 空闲 P 链表
npidle uint32 // 空闲 P 数量
nmspinning uint32 // 自旋 M 数量

// G 相关
goidgen uint64 // 下一个 G ID
ngsys uint32 // 系统 G 数量

// GC 相关
gcwaiting uint32 // GC 等待标志
stopwait int32
stopnote note
sysmonwait uint32
sysmonnote note

// 其他
lastpoll uint64 // 最后一次网络轮询时间
pollUntil uint64 // 网络轮询截止时间
safePointFn func(*p)
safePointWait int32
safePointNote note
profilehz int32
}

关键字段说明

  • runq: 全局可执行队列,当本地队列满时,G 会被放入全局队列
  • midle: 空闲 M 链表
  • pidle: 空闲 P 链表
  • goidgen: 用于生成唯一的 G ID
  • gcwaiting: GC 等待标志,当 GC 需要停止所有 M 时设置

sudog (sudog)

sudog 表示等待队列中的一个元素,用于在 channel 操作中等待的 goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// runtime/runtime2.go

type sudog struct {
// 以下字段受 hchan.lock 保护
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 数据元素
acquiretime int64
releasetime int64
ticket uint32

// 以下字段不受锁保护
isSelect bool
success bool

parent *sudog // semaRoot 二叉树
waitlink *sudog // g.waiting 链表
waittail *sudog // semaRoot
c *hchan // channel
}

关键字段说明

  • g: 等待的 goroutine
  • elem: 数据元素指针
  • c: 关联的 channel
  • next/prev: 双向链表指针,用于在等待队列中链接
  • isSelect: 是否在 select 语句中
  • success: 操作是否成功

gobuf (gobuf)

gobuf 保存 goroutine 的调度上下文,包括寄存器状态。

1
2
3
4
5
6
7
8
9
10
11
// runtime/runtime2.go

type gobuf struct {
sp uintptr // 栈指针
pc uintptr // 程序计数器
g guintptr // goroutine
ctxt unsafe.Pointer // 上下文
ret uintptr // 返回值
lr uintptr // 链接寄存器(ARM)
bp uintptr // 基址指针(x86)
}

关键字段说明

  • sp: 栈指针,保存 goroutine 的栈顶位置
  • pc: 程序计数器,保存 goroutine 的下一条指令地址
  • g: 关联的 goroutine
  • bp: 基址指针,用于函数调用栈帧

stack (stack)

stack 表示 goroutine 的栈空间。

1
2
3
4
5
6
// runtime/runtime2.go

type stack struct {
lo uintptr // 栈低地址
hi uintptr // 栈高地址
}

说明
在 Go 的实现中,lo 表示栈的低地址,hi 表示高地址。虽然大多数平台的栈空间确实是从高地址向低地址增长(即“压栈”时 SP 递减),但这里的“起始地址”是指“分配的这块栈空间的起始位置”,即一块内存区域的低地址端,而不是说栈的生长方向。

举例说明(假设栈空间分配在 [1000, 2000) 区间):

  • lo = 1000hi = 2000
  • 虽然运行时栈顶(SP 寄存器)从 hilo 逐渐递减(高地址向低地址增长),但 lo 依然是这段内存的起始地址。

所以:

  • lo: 这段栈的最低地址,分配块的起点
  • hi: 这段栈的最高地址,分配块的终点
  • 栈顶(SP)初值是 hi,压栈时向 lo 递减,到达 lo 就栈溢出了

结构体关系图

graph TB
    subgraph "全局调度器"
        S[schedt
全局调度器] S -->|管理| MList[M 链表] S -->|管理| PList[P 链表] S -->|管理| GQueue[全局 G 队列] end subgraph "M - 内核线程" M[M
m0, m1, ...] M -->|绑定| P M -->|执行| G0[g0
调度器 goroutine] M -->|执行| G1[用户 goroutine] end subgraph "P - 处理器" P[P
p0, p1, ...] P -->|管理| LocalQ[本地 G 队列
runq] P -->|管理| FreeG[空闲 G 列表] P -->|管理| MCache[内存缓存] end subgraph "G - Goroutine" G[G
goroutine] G -->|包含| Stack[栈空间] G -->|包含| Sched[调度上下文
gobuf] G -->|等待| Sudog[sudog
等待队列元素] end style S fill:#ffcccc style M fill:#ccffcc style P fill:#ccccff style G fill:#ffffcc

结构体协作关系

  1. M-P-G 关系

    • M 必须绑定 P 才能执行 G
    • P 管理本地 G 队列,M 从 P 的队列中获取 G 执行
    • 一个 M 同时只能执行一个 G
  2. 调度流程

    • M 从 P 的本地队列获取 G
    • 如果本地队列为空,从全局队列获取
    • 如果全局队列也为空,从其他 P 偷取 G
  3. 等待机制

    • G 等待时(如 channel 操作),会创建 sudog 加入等待队列
    • 当条件满足时,通过 sudog 唤醒对应的 G
  4. 栈管理

    • 每个 G 都有独立的栈空间
    • g0 拥有较大的栈,用于执行调度代码
    • 用户 G 的栈可以动态增长

启动

启动汇编函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// runtime/asm_amd64.s

TEXT runtime·rt0_go(SB),NOSPLIT,$0
// ... 省略 ...

// create istack out of the given (operating system) stack.
// _cgo_init may update stackguard.
// 给 runtime.g0 创建栈
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024+104)(SP), BX
MOVQ BX, g_stackguard0(DI)
MOVQ BX, g_stackguard1(DI)
MOVQ BX, (g_stack+stack_lo)(DI)
MOVQ SP, (g_stack+stack_hi)(DI)

// ... 省略 ...

// set the per-goroutine and per-mach "registers"
get_tls(BX)
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX

// save m->g0 = g0
// 绑定 m0 和 g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)

CLD // convention is D is always left cleared
CALL runtime·check(SB)

MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB) // 解析命令行参数
CALL runtime·osinit(SB) // 获取CPU核数
CALL runtime·schedinit(SB) // 初始化调度

// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
PUSHQ $0 // arg size
CALL runtime·newproc(SB)// 执行runtime.main
POPQ AX
POPQ AX

// start this M
CALL runtime·mstart(SB) // 启动m0

CALL runtime·abort(SB) // mstart should never return
RET

rt0_go函数主要流程:

  • 初始化g0m0
  • g0m0互相绑定
  • 初始化相关数据,初始化指定个数的p
  • 创建新g绑定runtime.main函数,加入p的可执行列表中
  • 启动m0开始循环调度。

schedinit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// runtime/proc.go

func schedinit() {
// ... 省略 ...

_g_ := getg() // 获取当前绑定的g

// 限制M的数量
sched.maxmcount = 10000

// ... 省略 ...

// 创建 p
lock(&sched.lock)
sched.lastpoll = uint64(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
unlock(&sched.lock)
}

schedinit函数主要流程:

  • 初始化全局调度相关值
  • 限制m的最多个数
  • 初始化指定个数的p

newproc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// runtime/proc.go

func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, argp, siz, gp, pc)

_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)

if mainStarted { // mainStarted 是在 runtime.main 中设置为 true
wakep() // 尝试找一个p绑定m
}
})
}

newproc函数主要流程:

  • 创建栈大小为siz的新g,并关联fn
  • 将新g存放于_p_的可执行队列中
  • 此时刚初始化,并没有执行runtime.main所以不会执行wakep
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// runtime/proc.go

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
_g_ := getg()

acquirem() // disable preemption because it can be holding p in a local var
siz := narg
siz = (siz + 7) &^ 7

_p_ := _g_.m.p.ptr() // 获取 p
newg := gfget(_p_) // 从 p 的空闲 g 列表中获取 g
if newg == nil { // 没有空闲的 g
newg = malg(_StackMin) // 创建一个拥有最小栈的 g
casgstatus(newg, _Gidle, _Gdead) // 转换状态
allgadd(newg) // 向全局 g 列表中添加 g
}

// ... 省略 ...

// 填充 g
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // 当g执行完后的处理函数
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
casgstatus(newg, _Gdead, _Grunnable) // 切换成可执行状态

// 分配goid,如果没有则向p批量获取
if _p_.goidcache == _p_.goidcacheend {
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
newg.goid = int64(_p_.goidcache)
_p_.goidcache++

releasem(_g_.m)

return newg
}

newproc1函数主要流程:

  • 从空闲g列表中获取或新建g,将g的信息填充

mstart

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// runtime/proc.go

func mstart() {
_g_ := getg()

// 设置 _g_ 的栈信息
osStack := _g_.stack.lo == 0
if osStack {
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0

mstart1() // 不会返回
}

mstart函数主要流程:

  • 填充g的栈信息
  • m开始执行g上的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// runtime/proc.go

func mstart1() {
_g_ := getg()

// ... 省略 ...

if _g_.m == &m0 {
mstartm0() // 启动 m0 初始化信号处理
}

if fn := _g_.m.mstartfn; fn != nil {
fn() // 执行m绑定的启动时调用的函数
}

if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule() // 开始调度
}

mstart1函数主要流程:

  • 如果是m0,则初始化信号处理
  • 如果有mstartfn,则执行
  • 如果不是m0,则绑定p
  • 调用schedule启动golang进程的调度。

runtime.main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// runtime/proc.go

func main() {
g := getg()

// 确认栈的最大值
if sys.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}

// Allow newproc to start new Ms.
mainStarted = true // 标志 newproc 时可以启动 m

if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
newm(sysmon, nil, -1) // 新建 m 去执行 sysmon
})
}

lockOSThread()

doInit(&runtime_inittask) // 执行runtime包中的init函数

// Record when the world started.
runtimeInitTime = nanotime()

gcenable() // 开启GC

main_init_done = make(chan bool)
doInit(&main_inittask) // 执行main包中的init函数
close(main_init_done)

unlockOSThread()

fn := main_main // fn 指向main包的main函数
fn() // 执行main包的main函数

exit(0)
}

runtime.main函数主要功能:

  • 设置了栈的最大值
  • 创建m去执行sysmon
  • 调用runtime包的init函数
  • 启动GC
  • 执行main包的init函数
  • 退出

startm

除了m0是汇编初始化的,其他的m都是由startm创建的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil { // _p_ 为空
_p_ = pidleget() // 从p空闲列表中获取一个
if _p_ == nil { // 获取失败
unlock(&sched.lock)
if spinning {
// 如果是自旋状态,调用方增加了nmspinning,但是没有空闲的P,因此可以取消增量并放弃
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
mp := mget() // 从m空闲列表中获取m
if mp == nil { // 如果空闲列表没有
id := mReserveID() // 获取 m id
unlock(&sched.lock)

var fn func()
if spinning {
fn = mspinning // 设置 m 的自旋状态函数
}
newm(fn, _p_, id) // 创建一个m对象
return
}
unlock(&sched.lock)
if mp.spinning {
throw("startm: m is spinning")
}
if mp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// 由调用者确定是否自旋,并将m.nextp设置为p
mp.spinning = spinning
mp.nextp.set(_p_)
notewakeup(&mp.park)
}

startm函数主要流程:

  • 获取一个p,失败则返回
    • 获取一个m,如果失败就创建m并返回
  • m暂存p

newm

1
2
3
4
5
6
func newm(fn func(), _p_ *p, id int64) {
mp := allocm(_p_, fn, id) // 创建新m
mp.nextp.set(_p_) // 暂存p
mp.sigmask = initSigmask // 信号掩码
newm1(mp) // 绑定操作系统线程
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func allocm(_p_ *p, fn func(), id int64) *m {
_g_ := getg()
acquirem() // disable GC because it can be called from sysmon
if _g_.m.p == 0 {
acquirep(_p_) // 临时绑定p
}

// 清理可以安全删除的m的g0栈信息
if sched.freem != nil {
lock(&sched.lock)
var newList *m
for freem := sched.freem; freem != nil; {
if freem.freeWait != 0 {
next := freem.freelink
freem.freelink = newList
newList = freem
freem = next
continue
}
stackfree(freem.g0.stack) // 清空freem.g0的栈信息
freem = freem.freelink
}
sched.freem = newList // 更新已被释放的m列表
unlock(&sched.lock)
}

mp := new(m)
mp.mstartfn = fn // 绑定m启动函数
mcommoninit(mp, id) // 绑定mp的id

// 初始化g0栈信息
if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "ios" {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}
mp.g0.m = mp

if _p_ == _g_.m.p.ptr() {
releasep() // 解绑p
}
releasem(_g_.m)

return mp
}
1
2
3
4
5
func newm1(mp *m) {
execLock.rlock() // Prevent process clone.
newosproc(mp) // 绑定操作系统线程
execLock.runlock()
}

newm函数主要流程:

  • 释放可以清理的mg0栈空间
  • 新建m,绑定id和启动函数,申请g0栈空间
  • m暂存当前p
  • 创建与m对应的操作系统线程

netpoll

netpoll可以让调度器从就绪的网络事件中获取可执行的goroutine。
由于golang对每个系统的netpoll做了条件编译,这里就拿linux的实现来说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func netpoll(delay int64) gList {
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)

var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
netpollready(&toRun, pd, mode) // 将符合的g填充进toRun中
}
}
return toRun
}

netpoll函数主要流程:

  • 调用epollwait获取就绪的文件描述符
  • pd中的就绪g追加进toRun里面
  • 返回toRun

调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// runtime/proc.go

func schedule() {
_g_ := getg()

// 如果_g_绑定的m有锁定的g,则抛弃_g_,转而执行锁定的g
if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}

top:
pp := _g_.m.p.ptr()
pp.preempt = false

// 如果准备GC,则休眠当前m,直到被唤醒
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if pp.runSafePointFn != 0 {
runSafePointFn()
}

checkTimers(pp, 0)

var gp *g
var inheritTime bool

tryWakeP := false
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 {
// 找GCWorker
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
if gp == nil {
// 为了让全局可执行队列的g能够运行,这里每操作一定次数就从全局队列中获取
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
// 从本地可执行队列中获取
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
// 从其他地方找一个g来执行,如果没有则阻塞在这里
gp, inheritTime = findrunnable() // blocks until work is available
}

// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
// 如果当前m正在自旋,则重置自旋状态
resetspinning()
}

if tryWakeP {
wakep() // GCworker 或 tracereader 需要唤醒p
}
if gp.lockedm != 0 {
// m将自己的p让给gp锁定的m,自己阻塞等待新p
startlockedm(gp)
goto top
}

execute(gp, inheritTime) // 执行gp
}

schedule函数主要流程:

  • 如果g有绑定的m,则直接让绑定m执行g
  • 如果要GC,则休眠当前m,等待唤醒
  • traceReaderGCWorkerglobrunqgetrunqgetfindrunnable函数中获取一个可执行gp
  • 重置自旋状态
  • 如果需要唤醒p,则尝试唤醒p
  • 如果获取的gp有锁定的m,则让出自己的pgp锁定的m,自己则阻塞等待被唤醒
  • 执行gp

findrunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// runtime/proc.go

func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()

top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}

now, pollUntil, _ := checkTimers(_p_, 0)

// 如果有finalizer可用,直接唤醒
if fingwait && fingwake {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
}
}

// 本地获取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}

// 全局获取
// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}

// 没有可以执行的goroutine

// 获取网络事件完成的gp,优化
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}

// 从其他的P偷取
// Steal work from other P's.
procs := uint32(gomaxprocs)
ranTimer := false

// 将m置为自旋状态
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}

// 随机从别的p中偷取4次
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}

if i > 2 || (i > 1 && shouldStealTimers(p2)) {
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
if ran {
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
ranTimer = true
}
}
}
}
if ranTimer {
// Running a timer may have made some goroutine ready.
goto top
}
// ... 省略 ...
}

findrunnable函数主要流程:

  • 如果有finalizer可执行gp,直接唤醒
  • 如果从本地可执行队列中获取可执行gp,返回gp
  • 如果从全局可执行队列中获取可执行gp,返回gp
  • 如果有就绪的网络事件的gp,返回gp
  • 从其他的p中偷取部分gp,返回gp

runqsteal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// runtime/proc.go

func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.LoadAcq(&_p_.runqhead)
atomic.StoreRel(&_p_.runqtail, t+n)
return gp
}

runqsteal函数主要流程:

  • 获取本地队列队尾坐标
  • p2中获取部分可执行队列
  • 如果只偷取了一个,直接返回
  • 否则需要原子修改可执行队列的首尾指针

runqgrab

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// runtime/proc.go

func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&_p_.runqhead)
t := atomic.LoadAcq(&_p_.runqtail)
n := t - h
n = n - n/2
if n == 0 {
if stealRunNextG {
// Try to steal from _p_.runnext.
if next := _p_.runnext; next != 0 {
// 休眠让p不会执行将要偷取的
if _p_.status == _Prunning {
if GOOS != "windows" {
usleep(3)
} else {
osyield()
}
}
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(_p_.runq)/2) { // 保证队列没有改动
continue
}
// 偷取前半g可执行队列
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 提交本次消费,如果失败则从新再试一次
if atomic.CasRel(&_p_.runqhead, h, h+n) {
return n
}
}
}

runqgrab函数主要流程:

  • 原子获取待偷取p可执行队列首尾位置
  • 如果没有,则判断是否需要偷取next指针的gp
  • 复制p的前半部分可执行队列
  • 原子的修改p的可执行队列的首尾指针位置

M 自旋机制

M 自旋(spinning)是 Go 调度器中的一个重要优化机制,用于在 M 没有可执行的 G 时,不立即进入休眠状态,而是短暂地”自旋”等待,尝试从其他 P 偷取 G 或等待新的 G 出现。

自旋的概念

自旋是指 M 在找不到可执行的 G 时,不立即进入休眠状态,而是保持运行状态,循环尝试获取可执行的 G。这是一种性能优化策略,可以:

  1. 减少上下文切换:避免频繁地让 M 进入休眠和唤醒,减少系统调用开销
  2. 提高响应速度:当新的 G 出现时,自旋的 M 可以立即执行,无需等待唤醒
  3. 平衡负载:自旋的 M 会尝试从其他 P 偷取 G,实现负载均衡

自旋状态的管理

自旋状态的设置

findrunnable 函数中,当 M 开始尝试从其他 P 偷取 G 时,会设置自旋状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// runtime/proc.go

func findrunnable() (gp *g, inheritTime bool) {
// ... 省略前面的代码 ...

// 将m置为自旋状态
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1) // 全局自旋 M 计数加 1
}

// 随机从别的p中偷取4次
for i := 0; i < 4; i++ {
// ... 偷取逻辑 ...
}
}

关键点

  • _g_.m.spinning:M 的本地自旋标志
  • sched.nmspinning:全局自旋 M 计数器,用于控制自旋 M 的数量

自旋状态的重置

当 M 找到可执行的 G 后,需要重置自旋状态。这发生在 schedule 函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// runtime/proc.go

func schedule() {
// ... 省略前面的代码 ...

// 从 findrunnable 获取 G
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}

// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
// 如果当前m正在自旋,则重置自旋状态
resetspinning()
}

// ... 执行 G ...
}

resetspinning 函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// runtime/proc.go

func resetspinning() {
_g_ := getg()
if !_g_.m.spinning {
throw("resetspinning: not a spinning m")
}
_g_.m.spinning = false
nmspinning := atomic.Xadd(&sched.nmspinning, -1)
if int32(nmspinning) < 0 {
throw("findrunnable: negative nmspinning")
}
// M 停止自旋后,如果还有可执行的 G,尝试唤醒新的 M
if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
wakep()
}
}

重置流程

  1. 检查 M 确实在自旋状态
  2. 清除 M 的本地自旋标志
  3. 全局自旋计数减 1
  4. 如果全局没有自旋的 M 且有空闲的 P,尝试唤醒新的 M

自旋的触发条件

M 进入自旋状态需要满足以下条件:

  1. 本地队列为空:当前 P 的本地运行队列没有可执行的 G
  2. 全局队列为空:全局运行队列也没有可执行的 G
  3. 网络轮询无结果:没有就绪的网络事件
  4. 需要尝试工作窃取:准备从其他 P 偷取 G

自旋时的行为

当 M 处于自旋状态时,会执行以下操作:

  1. 工作窃取(Work Stealing)

    • 随机选择其他 P
    • 尝试从选中的 P 的本地队列偷取一半的 G
    • 最多尝试 4 轮,每轮遍历所有 P
  2. 检查定时器

    • 检查其他 P 的定时器
    • 如果有到期的定时器,执行定时器回调
    • 定时器回调可能会创建新的可执行 G
  3. 检查网络事件

    • 检查是否有就绪的网络 IO 事件
    • 如果有,将对应的 G 唤醒

自旋的退出条件

M 会在以下情况下退出自旋状态:

  1. 找到可执行的 G

    • 从其他 P 偷取到 G
    • 定时器回调创建了新的 G
    • 网络事件就绪,对应的 G 被唤醒
  2. 进入休眠

    • 如果长时间找不到可执行的 G,M 会退出自旋并进入休眠
    • 休眠的 M 会被放入空闲 M 列表
  3. GC 等待

    • 如果系统需要 GC,自旋的 M 会停止自旋并等待 GC 完成

自旋数量的控制

Go 调度器通过 sched.nmspinning 控制全局自旋 M 的数量,避免过多的 M 同时自旋浪费 CPU:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// runtime/proc.go

func handoffp(_p_ *p) {
// ... 省略前面的代码 ...

// 如果没有自旋的m和空闲的p,并且增加自旋数成功,则让_p_绑定一个m进入自旋
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 &&
atomic.Cas(&sched.nmspinning, 0, 1) {
startm(_p_, true) // 创建自旋的 M
return
}

// ... 省略后面的代码 ...
}

控制策略

  • 只有当全局没有自旋的 M 且没有空闲的 P 时,才创建新的自旋 M
  • 使用原子操作确保只有一个 M 能成功进入自旋
  • 避免过多的 M 同时自旋,浪费 CPU 资源

自旋与性能优化

自旋机制是 Go 调度器性能优化的重要组成部分:

  1. 减少延迟

    • 当新的 G 出现时,自旋的 M 可以立即执行,无需等待唤醒
    • 避免了线程休眠和唤醒的系统调用开销
  2. 提高吞吐量

    • 自旋的 M 会主动从其他 P 偷取 G,实现负载均衡
    • 充分利用多核 CPU 资源
  3. 平衡开销

    • 通过控制自旋 M 的数量,在响应速度和 CPU 利用率之间取得平衡
    • 避免过多的自旋浪费 CPU 资源

自旋流程图

flowchart TD
    A[M 找不到可执行的 G] --> B{本地队列为空?}
    B -->|否| C[从本地队列获取 G]
    B -->|是| D{全局队列为空?}
    D -->|否| E[从全局队列获取 G]
    D -->|是| F{网络事件就绪?}
    F -->|是| G[唤醒网络 G]
    F -->|否| H[设置自旋状态]
    H --> I[增加全局自旋计数]
    I --> J[尝试从其他 P 偷取 G]
    J --> K{偷取成功?}
    K -->|是| L[重置自旋状态]
    K -->|否| M{还有 P 未检查?}
    M -->|是| J
    M -->|否| N[退出自旋,进入休眠]
    L --> O[执行 G]
    C --> O
    E --> O
    G --> O
    
    style H fill:#ffcccc
    style I fill:#ffcccc
    style L fill:#ccffcc
    style N fill:#ffffcc

总结

M 自旋是 Go 调度器的一个重要优化机制,它通过让 M 在找不到可执行的 G 时短暂自旋,而不是立即休眠,来提高系统的响应速度和吞吐量。自旋机制需要平衡响应速度和 CPU 利用率,通过控制自旋 M 的数量来实现这一平衡。

execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// runtime/proc.go

func execute(gp *g, inheritTime bool) {
_g_ := getg()

// 互相绑定 _g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning) // 转换状态
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}

gogo(&gp.sched)
}

execute函数主要流程:

  • mg相互绑定
  • 设置相关值
  • 调用gogo函数执行gp

gogo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// runtime/asm_amd64.s

TEXT runtime·gogo(SB), NOSPLIT, $16-8
MOVQ buf+0(FP), BX // gobuf
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
get_tls(CX)
MOVQ DX, g(CX)
MOVQ gobuf_sp(BX), SP // restore SP
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
MOVQ $0, gobuf_ret(BX)
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
MOVQ gobuf_pc(BX), BX
JMP BX

gogo函数主要流程:

  • gobuf的内容存放到相关寄存器中
  • gobuf的内容清空
  • 执行gobuf.pc

Gosched

除了上述的通过运行时启动调度之外,golang还提供了手动的调度函数Gosched函数,该函数在运行时内外都可以触发下一次调度。

1
2
3
4
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}
1
2
3
func gosched_m(gp *g) {
goschedImpl(gp)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable)
dropg() // 解绑g和m
lock(&sched.lock)
globrunqput(gp) // 放入全局可执行队列
unlock(&sched.lock)

schedule() // 下一次调度
}

Gosched函数主要流程:

  • 获取gp的状态
  • 切换gp的状态为_Grunnable
  • 解绑g和m
  • 将g存入全局可执行队列中
  • 启动下一次调度

销毁

1
2
3
4
5
TEXT runtime·goexit(SB),NOSPLIT,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
// traceback from goexit1 must hit code range of goexit
BYTE $0x90 // NOP
1
2
3
4
5
6
7
8
9
func goexit1() {
if raceenabled {
racegoend()
}
if trace.enabled {
traceGoEnd()
}
mcall(goexit0)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func goexit0(gp *g) {
_g_ := getg()

// 切换g的状态
casgstatus(gp, _Grunning, _Gdead)
// 标记系统goroutine
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
// 清理gp相关的数据
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil

dropg() // 解绑当前m和gp

if GOARCH == "wasm" { // no threads yet on wasm
gfput(_g_.m.p.ptr(), gp) // 将gp存放到p的空闲列表中
schedule() // 下一次调度
}

gfput(_g_.m.p.ptr(), gp) // 将gp存放到p的空闲列表中
if locked {
// 如果gp锁定了m,则将这个m杀死
if GOOS != "plan9" {
gogo(&_g_.m.g0.sched)
} else {
_g_.m.lockedExt = 0
}
}
schedule() // 下一次调度
}

goexit0函数主要流程:

  • 切换g的状态
  • 解绑g所有绑定的数据
  • 如果是wasm架构,直接将g存于空闲列表中,并开始下一次调度
  • 否则,直接将g存于空闲列表中,如果g有锁定的m,则将m杀死,开始下一次调度

切换

执行完毕切换

上面说到,当goroutine执行完毕时,会执行goexit0函数,进而执行下一次调度

主动切换

当goroutine中阻塞的操作时,就需要让出CPU,让其他的goroutine执行。所有主动切换都是调用gopark函数来实现的。

gopark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts()
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
// 填充相关参数
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}

gopark函数主要流程:

  • 获取当前m绑定的gp
  • 填充相关参数
  • 利用g0调用park_m函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func park_m(gp *g) {
_g_ := getg()

casgstatus(gp, _Grunning, _Gwaiting)
dropg() // 解绑g和m

if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock) // 尝试调用解锁函数
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
// 如果解锁成功
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // 直接执行gp
}
}
schedule() // 下一次调度
}

park_m函数主要流程:

  • 切换g的状态为等待
  • 尝试解锁,如果成功则切换状态为可执行,直接调用execute函数执行
  • 否则,进入下一个调度

goready

当goroutine通过gopark函数由_Grunning_Gwaiting,反向操作goready函数则是将_Gwaiting_Grunnable

1
2
3
4
5
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func ready(gp *g, traceskip int, next bool) {
status := readgstatus(gp)

_g_ := getg()
mp := acquirem()
// 如果不是 _Gwaiting 抛异常
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}

// 切换状态
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next) // 存放到本地可执行队列中
wakep() // 尝试去唤起p去执行
releasem(mp)
}

ready函数主要流程:

  • 获取gp的状态
  • 获取当前的g
  • 检测gp状态是不是_Gwaiting
  • _Gwaiting转换为_Grunnable状态
  • 放进当前gmp中的本地可执行队列中

抢占切换

golang调度本质上是非抢占式的,golang利用标志位标志当前的goroutine是否可以被抢占,而触发时机是在栈扩容的时候。
golang中有个监控函数,监控着整个进程运行的相关数据,其中就包括检查某个goroutine是否占用CPU时间过长,从而进行标记抢占标记位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func sysmon() {
// ... 省略 ...

for {
// ... 省略 ...

// 解绑在陷入系统调用中的p,和抢占长时间运行的g
if retake(now) != 0 {
idle = 0
} else {
idle++
}

// ... 省略 ...
}
}

retake

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &_p_.sysmontick // sysmon 信息记录
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// 处于 _Prunning 或者 _Psyscall 状态时,如果上一次触发调度的时间已经过去了 10ms,
// 我们就会通过 runtime.preemptone 抢占当前处理器
// 如果G运行时间太长则抢占G
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(_p_)
// 在_Psyscall时preemptone函数不会工作,因为m没有绑定p
sysretake = true
}
}
if s == _Psyscall {
// 当处理器处于 _Psyscall 状态时
// 当处理器的运行队列不为空或者不存在空闲处理器时并且当系统调用时间超过了 10ms 时
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}

if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
// 如果_p_没有可执行的g,且有自旋的m或空闲的p,且系统调用时间没有超过10ms
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)

// 将p的状态设置为_Pidle,计数器n加1,_p_的系统调用次数+1
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
n++
_p_.syscalltick++
handoffp(_p_) // 让 p 去找其他的事情干
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}

retake函数主要流程:

  • 遍历所有的p
  • 如果p长时间没有调度则标记抢占标志位
  • 如果p在系统调用中,且超过阈值时间,则解绑p
  • 返回解绑p的个数

preemptone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
// 如果mp为空,或mp是当前运行的m
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
// gp 不能使 g0
return false
}

gp.preempt = true // 标志gp可以被抢占

gp.stackguard0 = stackPreempt // 直接设置为栈顶,方便触发栈扩容

// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true // 标记p快速调度
preemptM(mp) // 向mp发送抢占信号
}

return true
}

handoffp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func handoffp(_p_ *p) {
// 如果本地有可执行的G或全局可执行队列长度不为0,则直接开始执行
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
// 如果可以执行GC,则立即执行
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// 如果没有自旋的m和空闲的p,并且增加自旋数成功,则让_p_绑定一个m进入自旋
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) {
startm(_p_, true)
return
}
lock(&sched.lock)
if sched.gcwaiting != 0 { // 即将GC
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
// 此时如果全局队列有可执行的g,则执行
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// 如果这是最后运行的P并且没有人正在轮询网络,则需要唤醒另一个M来轮询网络。
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
if when := nobarrierWakeTime(_p_); when != 0 {
wakeNetPoller(when)
}
// 都没有则将_p_存放到空闲P列表中
pidleput(_p_)
unlock(&sched.lock)
}

retake函数主要流程:

  • 如果p的本地可执行队列不为空,或全局可执行队列不为空,则绑定m去执行
  • 如果p可以执行GC工作,则绑定m去执行
  • 如果没有m在自旋且没有空闲的p,且成功设置自旋值,则获取一个m,进入自旋
  • 如果此时在检测全局可执行队列是否为空,有则绑定m去执行
  • 如果是最后一个正在运行的p,则绑定m去轮询网络
  • 都没有则将p存放进空闲p列表

newstack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func newstack() {
thisg := getg() // 当前执行的g

gp := thisg.m.curg // m绑定的g

// 判断是否抢占触发的栈扩张
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

// ... 省略 ...

if preempt {
// Act like goroutine called runtime.Gosched.
gopreempt_m(gp) // never return
}
// ... 省略 ...
}
1
2
3
4
5
6
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}

goschedImpl函数就是上述Gosched函数的主要执行实体了。

如果检测到是抢占,则将m绑定的g放入全局可执行队列中。

系统调用切换

golang提供了系统调用接口:

1
2
func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)

更多参数可以调用Syscall6或Syscall9。

Syscall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
TEXT	·Syscall(SB),NOSPLIT,$0-56
CALL runtime·entersyscall(SB)
MOVQ a1+8(FP), DI
MOVQ a2+16(FP), SI
MOVQ a3+24(FP), DX
MOVQ trap+0(FP), AX // syscall entry
ADDQ $0x2000000, AX
SYSCALL
JCC ok
MOVQ $-1, r1+32(FP)
MOVQ $0, r2+40(FP)
MOVQ AX, err+48(FP)
CALL runtime·exitsyscall(SB)
RET
ok:
MOVQ AX, r1+32(FP)
MOVQ DX, r2+40(FP)
MOVQ $0, err+48(FP)
CALL runtime·exitsyscall(SB)
RET

Syscall函数主要流程:

  • 调用runtime.entersyscall
  • 将参数存至寄存器
  • 执行系统调用
  • 将返回值压栈
  • 调用runtime.exitsyscall

entersyscall

1
2
3
func entersyscall() {
reentersyscall(getcallerpc(), getcallersp())
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func reentersyscall(pc, sp uintptr) {
_g_ := getg()
_g_.m.locks++

_g_.stackguard0 = stackPreempt // 等待被抢占
_g_.throwsplit = true

// Leave SP around for GC and traceback.
save(pc, sp) // 保存现场
_g_.syscallsp = sp
_g_.syscallpc = pc
casgstatus(_g_, _Grunning, _Gsyscall) // 切换状态
if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
systemstack(func() {
print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
throw("entersyscall")
})
}

if trace.enabled {
systemstack(traceGoSysCall)
// systemstack itself clobbers g.sched.{pc,sp} and we might
// need them later when the G is genuinely blocked in a
// syscall
save(pc, sp)
}

if atomic.Load(&sched.sysmonwait) != 0 {
systemstack(entersyscall_sysmon)
save(pc, sp)
}

if _g_.m.p.ptr().runSafePointFn != 0 {
// runSafePointFn may stack split if run on this stack
systemstack(runSafePointFn)
save(pc, sp)
}

_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
_g_.sysblocktraced = true
pp := _g_.m.p.ptr()
pp.m = 0
_g_.m.oldp.set(pp)
_g_.m.p = 0
atomic.Store(&pp.status, _Psyscall) // 切换p的状态
if sched.gcwaiting != 0 {
systemstack(entersyscall_gcwait)
save(pc, sp)
}

_g_.m.locks--
}

exitsyscall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func exitsyscall() {
_g_ := getg()

_g_.m.locks++

oldp := _g_.m.oldp.ptr()
_g_.m.oldp = 0
if exitsyscallfast(oldp) { // 尝试获取系统调用前绑定的p
_g_.m.p.ptr().syscalltick++
casgstatus(_g_, _Gsyscall, _Grunning)
_g_.syscallsp = 0
_g_.m.locks--
if _g_.preempt {
// 如果抢占,就设置stackguard0为stackPreempt
_g_.stackguard0 = stackPreempt
} else {
// 否则恢复真实栈帧
_g_.stackguard0 = _g_.stack.lo + _StackGuard
}
_g_.throwsplit = false

if sched.disable.user && !schedEnabled(_g_) {
// Scheduling of this goroutine is disabled.
Gosched() // 开始调度
}

return
}

// 没有p被绑定的情况
_g_.sysexitticks = 0
_g_.m.locks--

// Call the scheduler.
mcall(exitsyscall0)

_g_.syscallsp = 0
_g_.m.p.ptr().syscalltick++
_g_.throwsplit = false
}

exitsyscall函数主要流程:

  • 获取系统调用前绑定的oldp
  • 尝试获取oldp或从空闲列表获取p
  • 如果成功获取p,改变相关设置,开始下一轮调度
  • 如果没有获取,则调用exitsyscall0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func exitsyscall0(gp *g) {
_g_ := getg()

casgstatus(gp, _Gsyscall, _Grunnable)
dropg()
lock(&sched.lock)
var _p_ *p
if schedEnabled(_g_) { // 如果可以调度_g_
_p_ = pidleget() // 从p空闲列表中获取p
}
if _p_ == nil {
globrunqput(gp) // 没有可用的p,就将gp存放于全局可执行列表中
} else if atomic.Load(&sched.sysmonwait) != 0 {
atomic.Store(&sched.sysmonwait, 0)
notewakeup(&sched.sysmonnote)
}
unlock(&sched.lock)
if _p_ != nil { // 如果有可用的p
acquirep(_p_) // 直接绑定当前的m
execute(gp, false) // 执行gp
}
if _g_.m.lockedg != 0 { // 如果m有锁定的g
// Wait until another thread schedules gp and so m again.
stoplockedm() // 释放p,休眠m,会阻塞
execute(gp, false) // 执行gp
}
stopm() // 将m休眠,并存于m空闲列表中,会阻塞
schedule() // 下一次调度
}

exitsyscall0函数主要流程:

  • 切换gp状态为_Grunnable
  • 解绑gm
  • 尝试获取一个空闲的_p_
  • 如果没有获取到就把gp放到全局可执行列表中
  • 如果获取到了,就直接绑定当前的m,执行gp
  • 如果有m有锁定的g,释放p,休眠m,等待被唤醒
  • 否则m将放置与全局m空闲列表中,等待下一次调度

exitsyscall0函数如果没有p则会将m休眠

RawSyscall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
TEXT ·RawSyscall(SB),NOSPLIT,$0-56
MOVQ a1+8(FP), DI
MOVQ a2+16(FP), SI
MOVQ a3+24(FP), DX
MOVQ trap+0(FP), AX // syscall entry
ADDQ $0x2000000, AX
SYSCALL
JCC ok1
MOVQ $-1, r1+32(FP)
MOVQ $0, r2+40(FP)
MOVQ AX, err+48(FP)
RET
ok1:
MOVQ AX, r1+32(FP)
MOVQ DX, r2+40(FP)
MOVQ $0, err+48(FP)
RET

RawSyscall函数主要流程:

  • 将参数存至寄存器
  • 执行系统调用
  • 将返回值压栈

RawSyscall函数并没有执行runtime.entersyscallruntime.exitsyscall函数,由于没有执行相关操作导致golang无法准确的调度,可能会导致长时间系统调用,其他的goroutine无法得到执行。

辅助函数

getg

获取当前的g,由于是编译器填充的,所以没有源码。
一般都是从TLS寄存器获取的。

mcall

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
TEXT runtime·mcall(SB), NOSPLIT, $0-8
MOVQ fn+0(FP), DI

get_tls(CX)
MOVQ g(CX), AX // save state in g->sched
MOVQ 0(SP), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(AX)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(AX)
MOVQ AX, (g_sched+gobuf_g)(AX)
MOVQ BP, (g_sched+gobuf_bp)(AX)

// switch to m->g0 & its stack, call fn
MOVQ g(CX), BX
MOVQ g_m(BX), BX
MOVQ m_g0(BX), SI
CMPQ SI, AX // if g == m->g0 call badmcall
JNE 3(PC)
MOVQ $runtime·badmcall(SB), AX
JMP AX
MOVQ SI, g(CX) // g = m->g0
MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp
PUSHQ AX
MOVQ DI, DX
MOVQ 0(DI), DI
CALL DI // 执行fn,不能返回
POPQ AX
MOVQ $runtime·badmcall2(SB), AX
JMP AX
RET

mcall函数切换到mg0调用fn(g)fn是不能返回的。

systemstack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
TEXT runtime·systemstack(SB), NOSPLIT, $0-8
MOVQ fn+0(FP), DI // DI = fn
get_tls(CX)
MOVQ g(CX), AX // AX = g
MOVQ g_m(AX), BX // BX = m

CMPQ AX, m_gsignal(BX) // g == m.gsignal
JEQ noswitch

MOVQ m_g0(BX), DX // DX = g0
CMPQ AX, DX // g == g0
JEQ noswitch

CMPQ AX, m_curg(BX) // g == m.curg
JNE bad

// 保存当前g的栈信息
MOVQ $runtime·systemstack_switch(SB), SI
MOVQ SI, (g_sched+gobuf_pc)(AX)
MOVQ SP, (g_sched+gobuf_sp)(AX)
MOVQ AX, (g_sched+gobuf_g)(AX)
MOVQ BP, (g_sched+gobuf_bp)(AX)

// 切换到g0
MOVQ DX, g(CX)
MOVQ (g_sched+gobuf_sp)(DX), BX
// make it look like mstart called systemstack on g0, to stop traceback
SUBQ $8, BX
MOVQ $runtime·mstart(SB), DX
MOVQ DX, 0(BX)
MOVQ BX, SP

// call target function
MOVQ DI, DX
MOVQ 0(DI), DI
CALL DI // 执行目标函数

// 恢复原有g
get_tls(CX)
MOVQ g(CX), AX
MOVQ g_m(AX), BX
MOVQ m_curg(BX), AX
MOVQ AX, g(CX)
MOVQ (g_sched+gobuf_sp)(AX), SP
MOVQ $0, (g_sched+gobuf_sp)(AX)
RET

systemstack函数主要流程:

  • 检测相关参数
  • 保存g现场
  • 切换到g0,并执行fn(g)
  • 恢复原有g

acquirem

1
2
3
4
5
func acquirem() *m {
_g_ := getg()
_g_.m.locks++
return _g_.m
}

acquirem函数主要是增加locks引用计数,并返回当前的m。主要是防止GC回收m。

releasem

1
2
3
4
5
6
7
func releasem(mp *m) {
_g_ := getg()
mp.locks--
if mp.locks == 0 && _g_.preempt {
_g_.stackguard0 = stackPreempt
}
}

releasem函数主要是减少locks引用计数,并判断是否需要g被抢占

acquirep

1
2
3
func acquirep(_p_ *p) {
wirep(_p_)
}
1
2
3
4
5
6
func wirep(_p_ *p) {
_g_ := getg()
_g_.m.p.set(_p_)
_p_.m.set(_g_.m)
_p_.status = _Prunning
}

acquirep函数主要流程:

  • 绑定pm
  • p的状态置为_Prunning

releasep

1
2
3
4
5
6
7
8
func releasep() *p {
_g_ := getg()
_p_ := _g_.m.p.ptr()
_g_.m.p = 0
_p_.m = 0
_p_.status = _Pidle
return _p_
}

releasep函数主要流程:

  • 解绑mp
  • p的状态置为_Pidle
  • 返回p

疑问

p的本地可执行列表无锁,其他p怎么偷取可执行列表

通过原子cas的方式提交列表头尾位置,如果失败则重新偷取。

g进入_Gwaiting状态后去哪里了

  • 如果g是被抢占了,则将g的状态改为_Grunnable,放入全局可执行队列中
  • 如果是主动切换,调用gopark的调用者需要维护sudog列表(sudog用于保存调用goparkg),接收已完成的goroutine,然后调用goready,将他们状态置为_Grunnable,存入本地的可执行队列中。

m进入自旋,在干嘛

M 自旋是指 M 在找不到可执行的 G 时,不立即进入休眠状态,而是保持运行状态,循环尝试获取可执行的 G。

自旋时的行为

  1. 工作窃取:从其他 P 的本地队列偷取 G
  2. 检查定时器:检查并执行到期的定时器,定时器回调可能创建新的 G
  3. 检查网络事件:检查是否有就绪的网络 IO 事件
  4. 循环尝试:最多尝试 4 轮,每轮遍历所有 P

自旋的退出条件

  • 成功偷取到 G
  • 定时器回调创建了新的 G
  • 网络事件就绪,对应的 G 被唤醒
  • 长时间找不到 G,退出自旋进入休眠

自旋的作用

  • 减少线程休眠和唤醒的开销
  • 提高新 G 出现时的响应速度
  • 通过工作窃取实现负载均衡

详细说明请参考前面的 “M 自旋机制” 章节。

g0栈复用

g0的栈在golang中不同系统采用不同的初始化方式。

1
2
3
4
5
6
if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "ios" {
// 如果是上面的情况 g0栈是用的 pthread_create 线程栈
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}

每次切到g0栈执行指令时,g0->sched.sp在初始化后没有修改该过,所以每次切换到g0时栈起始值相同,每次调用mcall都会从指定栈位置开始执行相关操作,以此来复用g0栈。

参考文献


文章作者: djaigo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 djaigo !
评论
  目录