type gobuf struct { sp uintptr// 栈指针 pc uintptr// 程序计数器 g guintptr // goroutine ctxt unsafe.Pointer // 上下文 ret uintptr// 返回值 lr uintptr// 链接寄存器(ARM) bp uintptr// 基址指针(x86) }
关键字段说明:
sp: 栈指针,保存 goroutine 的栈顶位置
pc: 程序计数器,保存 goroutine 的下一条指令地址
g: 关联的 goroutine
bp: 基址指针,用于函数调用栈帧
stack (stack)
stack 表示 goroutine 的栈空间。
1 2 3 4 5 6
// runtime/runtime2.go
type stack struct { lo uintptr// 栈低地址 hi uintptr// 栈高地址 }
说明: 在 Go 的实现中,lo 表示栈的低地址,hi 表示高地址。虽然大多数平台的栈空间确实是从高地址向低地址增长(即“压栈”时 SP 递减),但这里的“起始地址”是指“分配的这块栈空间的起始位置”,即一块内存区域的低地址端,而不是说栈的生长方向。
举例说明(假设栈空间分配在 [1000, 2000) 区间):
lo = 1000,hi = 2000
虽然运行时栈顶(SP 寄存器)从 hi 向 lo 逐渐递减(高地址向低地址增长),但 lo 依然是这段内存的起始地址。
所以:
lo: 这段栈的最低地址,分配块的起点
hi: 这段栈的最高地址,分配块的终点
栈顶(SP)初值是 hi,压栈时向 lo 递减,到达 lo 就栈溢出了
结构体关系图
graph TB
subgraph "全局调度器"
S[schedt 全局调度器]
S -->|管理| MList[M 链表]
S -->|管理| PList[P 链表]
S -->|管理| GQueue[全局 G 队列]
end
subgraph "M - 内核线程"
M[M m0, m1, ...]
M -->|绑定| P
M -->|执行| G0[g0 调度器 goroutine]
M -->|执行| G1[用户 goroutine]
end
subgraph "P - 处理器"
P[P p0, p1, ...]
P -->|管理| LocalQ[本地 G 队列 runq]
P -->|管理| FreeG[空闲 G 列表]
P -->|管理| MCache[内存缓存]
end
subgraph "G - Goroutine"
G[G goroutine]
G -->|包含| Stack[栈空间]
G -->|包含| Sched[调度上下文 gobuf]
G -->|等待| Sudog[sudog 等待队列元素]
end
style S fill:#ffcccc
style M fill:#ccffcc
style P fill:#ccccff
style G fill:#ffffcc
// 创建 p lock(&sched.lock) sched.lastpoll = uint64(nanotime()) procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } unlock(&sched.lock) }
acquirem() // disable preemption because it can be holding p in a local var siz := narg siz = (siz + 7) &^ 7 _p_ := _g_.m.p.ptr() // 获取 p newg := gfget(_p_) // 从 p 的空闲 g 列表中获取 g if newg == nil { // 没有空闲的 g newg = malg(_StackMin) // 创建一个拥有最小栈的 g casgstatus(newg, _Gidle, _Gdead) // 转换状态 allgadd(newg) // 向全局 g 列表中添加 g }
funcstartm(_p_ *p, spinning bool) { lock(&sched.lock) if _p_ == nil { // _p_ 为空 _p_ = pidleget() // 从p空闲列表中获取一个 if _p_ == nil { // 获取失败 unlock(&sched.lock) if spinning { // 如果是自旋状态,调用方增加了nmspinning,但是没有空闲的P,因此可以取消增量并放弃 ifint32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return } } mp := mget() // 从m空闲列表中获取m if mp == nil { // 如果空闲列表没有 id := mReserveID() // 获取 m id unlock(&sched.lock)
var fn func() if spinning { fn = mspinning // 设置 m 的自旋状态函数 } newm(fn, _p_, id) // 创建一个m对象 return } unlock(&sched.lock) if mp.spinning { throw("startm: m is spinning") } if mp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // 由调用者确定是否自旋,并将m.nextp设置为p mp.spinning = spinning mp.nextp.set(_p_) notewakeup(&mp.park) }
funcallocm(_p_ *p, fn func(), id int64) *m { _g_ := getg() acquirem() // disable GC because it can be called from sysmon if _g_.m.p == 0 { acquirep(_p_) // 临时绑定p }
// 清理可以安全删除的m的g0栈信息 if sched.freem != nil { lock(&sched.lock) var newList *m for freem := sched.freem; freem != nil; { if freem.freeWait != 0 { next := freem.freelink freem.freelink = newList newList = freem freem = next continue } stackfree(freem.g0.stack) // 清空freem.g0的栈信息 freem = freem.freelink } sched.freem = newList // 更新已被释放的m列表 unlock(&sched.lock) }
// 如果_g_绑定的m有锁定的g,则抛弃_g_,转而执行锁定的g if _g_.m.lockedg != 0 { stoplockedm() execute(_g_.m.lockedg.ptr(), false) // Never returns. }
top: pp := _g_.m.p.ptr() pp.preempt = false
// 如果准备GC,则休眠当前m,直到被唤醒 if sched.gcwaiting != 0 { gcstopm() goto top } if pp.runSafePointFn != 0 { runSafePointFn() }
checkTimers(pp, 0)
var gp *g var inheritTime bool
tryWakeP := false if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) tryWakeP = true } } if gp == nil && gcBlackenEnabled != 0 { // 找GCWorker gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) tryWakeP = tryWakeP || gp != nil } if gp == nil { // 为了让全局可执行队列的g能够运行,这里每操作一定次数就从全局队列中获取 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { // 从本地可执行队列中获取 gp, inheritTime = runqget(_g_.m.p.ptr()) } if gp == nil { // 从其他地方找一个g来执行,如果没有则阻塞在这里 gp, inheritTime = findrunnable() // blocks until work is available }
// This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { // 如果当前m正在自旋,则重置自旋状态 resetspinning() } if tryWakeP { wakep() // GCworker 或 tracereader 需要唤醒p } if gp.lockedm != 0 { // m将自己的p让给gp锁定的m,自己阻塞等待新p startlockedm(gp) goto top }
top: _p_ := _g_.m.p.ptr() if sched.gcwaiting != 0 { gcstopm() goto top } if _p_.runSafePointFn != 0 { runSafePointFn() }
now, pollUntil, _ := checkTimers(_p_, 0)
// 如果有finalizer可用,直接唤醒 if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0, true) } }
// 本地获取 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime }
// 全局获取 // global runq if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } }
// 没有可以执行的goroutine
// 获取网络事件完成的gp,优化 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } }
// 从其他的P偷取 // Steal work from other P's. procs := uint32(gomaxprocs) ranTimer := false // 将m置为自旋状态 if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) }
// 随机从别的p中偷取4次 for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2// first look for ready queues with more than 1 g p2 := allp[enum.position()] if _p_ == p2 { continue } if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false }
if i > 2 || (i > 1 && shouldStealTimers(p2)) { tnow, w, ran := checkTimers(p2, now) now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w } if ran { if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } ranTimer = true } } } } if ranTimer { // Running a timer may have made some goroutine ready. goto top } // ... 省略 ... }
findrunnable函数主要流程:
如果有finalizer可执行gp,直接唤醒
如果从本地可执行队列中获取可执行gp,返回gp
如果从全局可执行队列中获取可执行gp,返回gp
如果有就绪的网络事件的gp,返回gp
从其他的p中偷取部分gp,返回gp
runqsteal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// runtime/proc.go
funcrunqsteal(_p_, p2 *p, stealRunNextG bool) *g { t := _p_.runqtail n := runqgrab(p2, &_p_.runq, t, stealRunNextG) if n == 0 { returnnil } n-- gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr() if n == 0 { return gp } h := atomic.LoadAcq(&_p_.runqhead) atomic.StoreRel(&_p_.runqtail, t+n) return gp }
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { h := atomic.LoadAcq(&_p_.runqhead) t := atomic.LoadAcq(&_p_.runqtail) n := t - h n = n - n/2 if n == 0 { if stealRunNextG { // Try to steal from _p_.runnext. if next := _p_.runnext; next != 0 { // 休眠让p不会执行将要偷取的 if _p_.status == _Prunning { if GOOS != "windows" { usleep(3) } else { osyield() } } if !_p_.runnext.cas(next, 0) { continue } batch[batchHead%uint32(len(batch))] = next return 1 } } return 0 } if n > uint32(len(_p_.runq)/2) { // 保证队列没有改动 continue } // 偷取前半g可执行队列 for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g } // 提交本次消费,如果失败则从新再试一次 if atomic.CasRel(&_p_.runqhead, h, h+n) { return n } } }
runqgrab函数主要流程:
原子获取待偷取p可执行队列首尾位置
如果没有,则判断是否需要偷取next指针的gp
复制p的前半部分可执行队列
原子的修改p的可执行队列的首尾指针位置
M 自旋机制
M 自旋(spinning)是 Go 调度器中的一个重要优化机制,用于在 M 没有可执行的 G 时,不立即进入休眠状态,而是短暂地”自旋”等待,尝试从其他 P 偷取 G 或等待新的 G 出现。
自旋的概念
自旋是指 M 在找不到可执行的 G 时,不立即进入休眠状态,而是保持运行状态,循环尝试获取可执行的 G。这是一种性能优化策略,可以:
减少上下文切换:避免频繁地让 M 进入休眠和唤醒,减少系统调用开销
提高响应速度:当新的 G 出现时,自旋的 M 可以立即执行,无需等待唤醒
平衡负载:自旋的 M 会尝试从其他 P 偷取 G,实现负载均衡
自旋状态的管理
自旋状态的设置
在 findrunnable 函数中,当 M 开始尝试从其他 P 偷取 G 时,会设置自旋状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// runtime/proc.go
funcfindrunnable() (gp *g, inheritTime bool) { // ... 省略前面的代码 ... // 将m置为自旋状态 if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) // 全局自旋 M 计数加 1 } // 随机从别的p中偷取4次 for i := 0; i < 4; i++ { // ... 偷取逻辑 ... } }
funcschedule() { // ... 省略前面的代码 ... // 从 findrunnable 获取 G if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available }
// This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { // 如果当前m正在自旋,则重置自旋状态 resetspinning() } // ... 执行 G ... }
resetspinning 函数的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// runtime/proc.go
funcresetspinning() { _g_ := getg() if !_g_.m.spinning { throw("resetspinning: not a spinning m") } _g_.m.spinning = false nmspinning := atomic.Xadd(&sched.nmspinning, -1) ifint32(nmspinning) < 0 { throw("findrunnable: negative nmspinning") } // M 停止自旋后,如果还有可执行的 G,尝试唤醒新的 M if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 { wakep() } }
重置流程:
检查 M 确实在自旋状态
清除 M 的本地自旋标志
全局自旋计数减 1
如果全局没有自旋的 M 且有空闲的 P,尝试唤醒新的 M
自旋的触发条件
M 进入自旋状态需要满足以下条件:
本地队列为空:当前 P 的本地运行队列没有可执行的 G
全局队列为空:全局运行队列也没有可执行的 G
网络轮询无结果:没有就绪的网络事件
需要尝试工作窃取:准备从其他 P 偷取 G
自旋时的行为
当 M 处于自旋状态时,会执行以下操作:
工作窃取(Work Stealing):
随机选择其他 P
尝试从选中的 P 的本地队列偷取一半的 G
最多尝试 4 轮,每轮遍历所有 P
检查定时器:
检查其他 P 的定时器
如果有到期的定时器,执行定时器回调
定时器回调可能会创建新的可执行 G
检查网络事件:
检查是否有就绪的网络 IO 事件
如果有,将对应的 G 唤醒
自旋的退出条件
M 会在以下情况下退出自旋状态:
找到可执行的 G:
从其他 P 偷取到 G
定时器回调创建了新的 G
网络事件就绪,对应的 G 被唤醒
进入休眠:
如果长时间找不到可执行的 G,M 会退出自旋并进入休眠
休眠的 M 会被放入空闲 M 列表
GC 等待:
如果系统需要 GC,自旋的 M 会停止自旋并等待 GC 完成
自旋数量的控制
Go 调度器通过 sched.nmspinning 控制全局自旋 M 的数量,避免过多的 M 同时自旋浪费 CPU:
flowchart TD
A[M 找不到可执行的 G] --> B{本地队列为空?}
B -->|否| C[从本地队列获取 G]
B -->|是| D{全局队列为空?}
D -->|否| E[从全局队列获取 G]
D -->|是| F{网络事件就绪?}
F -->|是| G[唤醒网络 G]
F -->|否| H[设置自旋状态]
H --> I[增加全局自旋计数]
I --> J[尝试从其他 P 偷取 G]
J --> K{偷取成功?}
K -->|是| L[重置自旋状态]
K -->|否| M{还有 P 未检查?}
M -->|是| J
M -->|否| N[退出自旋,进入休眠]
L --> O[执行 G]
C --> O
E --> O
G --> O
style H fill:#ffcccc
style I fill:#ffcccc
style L fill:#ccffcc
style N fill:#ffffcc
总结
M 自旋是 Go 调度器的一个重要优化机制,它通过让 M 在找不到可执行的 G 时短暂自旋,而不是立即休眠,来提高系统的响应速度和吞吐量。自旋机制需要平衡响应速度和 CPU 利用率,通过控制自旋 M 的数量来实现这一平衡。
funcgoschedImpl(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } casgstatus(gp, _Grunning, _Grunnable) dropg() // 解绑g和m lock(&sched.lock) globrunqput(gp) // 放入全局可执行队列 unlock(&sched.lock)
schedule() // 下一次调度 }
Gosched函数主要流程:
获取gp的状态
切换gp的状态为_Grunnable
解绑g和m
将g存入全局可执行队列中
启动下一次调度
销毁
1 2 3 4 5
TEXT runtime·goexit(SB),NOSPLIT,$0-0 BYTE $0x90 // NOP CALL runtime·goexit1(SB) // does not return // traceback from goexit1 must hit code range of goexit BYTE $0x90 // NOP
1 2 3 4 5 6 7 8 9
funcgoexit1() { if raceenabled { racegoend() } if trace.enabled { traceGoEnd() } mcall(goexit0) }
funcpreemptone(_p_ *p)bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { // 如果mp为空,或mp是当前运行的m returnfalse } gp := mp.curg if gp == nil || gp == mp.g0 { // gp 不能使 g0 returnfalse }
gp.preempt = true// 标志gp可以被抢占
gp.stackguard0 = stackPreempt // 直接设置为栈顶,方便触发栈扩容
// Request an async preemption of this P. if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true// 标记p快速调度 preemptM(mp) // 向mp发送抢占信号 }
// Leave SP around for GC and traceback. save(pc, sp) // 保存现场 _g_.syscallsp = sp _g_.syscallpc = pc casgstatus(_g_, _Grunning, _Gsyscall) // 切换状态 if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp { systemstack(func() { print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n") throw("entersyscall") }) }
if trace.enabled { systemstack(traceGoSysCall) // systemstack itself clobbers g.sched.{pc,sp} and we might // need them later when the G is genuinely blocked in a // syscall save(pc, sp) }
if atomic.Load(&sched.sysmonwait) != 0 { systemstack(entersyscall_sysmon) save(pc, sp) }
if _g_.m.p.ptr().runSafePointFn != 0 { // runSafePointFn may stack split if run on this stack systemstack(runSafePointFn) save(pc, sp) }