graph TB
A[Go Timer] --> B[Timer 单次定时器]
A --> C[Ticker 周期定时器]
B --> B1[time.After]
B --> B2[time.NewTimer]
B --> B3[time.AfterFunc]
C --> C1[time.NewTicker]
C --> C2[time.Tick]
style A fill:#51CF66
style B fill:#74C0FC
style C fill:#FFD43B
graph TB
A[堆操作] --> B[添加 timer]
A --> C[删除 timer]
A --> D[调整堆]
A --> E[清理僵尸 timer]
B --> F[需要修改 heap 数组]
C --> F
D --> F
E --> F
F --> G[必须加锁保护]
style G fill:#FF6B6B
// adjust 方法需要同时修改多个 timer 的状态和堆结构 func(ts *timers) adjust(now int64, force bool) { ts.lock() // 必须加锁 // 遍历堆中的所有 timer for i := 0; i < len(ts.heap); i++ { t := ts.heap[i].timer t.lock() // 锁定单个 timer // 修改 timer 状态和堆结构 if t.state&timerZombie != 0 { // 从堆中移除 ts.heap[i] = ts.heap[len(ts.heap)-1] ts.heap = ts.heap[:len(ts.heap)-1] } t.unlock() } ts.unlock() }
锁的设计考虑
虽然需要加锁,但 Go 通过以下方式优化性能:
每个 P 独立的锁:减少锁竞争范围
快速路径优化:使用原子操作(astate、len)避免频繁加锁
延迟调整:批量处理 timer 修改,减少锁持有时间
僵尸清理:延迟清理已停止的 timer,避免频繁堆操作
graph TB
A[Timer 操作] --> B{需要修改堆?}
B -->|否| C[使用原子操作 astate, len]
B -->|是| D[加锁 timers.mu]
D --> E[批量处理]
E --> F[快速解锁]
style C fill:#51CF66
style D fill:#FF6B6B
style E fill:#FFD43B
总结
虽然 timers 是每个 P 独立维护的,但需要加锁的原因包括:
✅ 调度器跨 P 访问:sysmon 等需要检查所有 P 的 timers
✅ Timer 跨 P 操作:Stop/Reset 可能在创建 timer 的 P 之外执行
✅ Goroutine 迁移:goroutine 可能在不同 P 之间迁移
✅ 堆结构保护:堆操作需要保证原子性
✅ 状态一致性:timer 状态和堆结构需要保持一致
通过每个 P 独立的锁和优化策略,Go 在保证正确性的同时,最大程度减少了锁竞争。
Timer 的添加流程
sequenceDiagram
participant U as 用户代码
participant R as Runtime
participant P as P (Processor)
participant H as Timer Heap
U->>R: time.NewTimer(duration)
R->>R: 计算 when = now + duration
R->>P: 获取当前 P
P->>H: 将 timer 添加到堆
H->>H: 堆化操作(上浮)
P->>P: 如果是最小 timer,唤醒 timerproc
R->>U: 返回 Timer 对象
Go 不再使用独立的 timerproc goroutine,而是通过 check 方法在调度过程中检查并触发到期的 timer:
flowchart TD
A[check 被调用] --> B[获取 wakeTime]
B --> C{有 timer?}
C -->|否| D[返回]
C -->|是| E{已到期?}
E -->|否| F[返回下次唤醒时间]
E -->|是| G[加锁 timers]
G --> H[调用 adjust 调整堆]
H --> I[循环调用 run]
I --> J{有 timer 到期?}
J -->|是| K[执行 timer]
K --> L{是 Ticker?}
L -->|是| M[更新 when 并重新入堆]
L -->|否| N[标记为僵尸或移除]
M --> I
N --> I
J -->|否| O[解锁并返回]
// modify 修改现有 timer func(t *timer) modify(when, period int64, f func(any, uintptr, int64), arg any, seq uintptr) bool { if when <= 0 { throw("timer when must be positive") } if period < 0 { throw("timer period must be non-negative") } async := debug.asynctimerchan.Load() != 0 if !async && t.isChan { lock(&t.sendLock) } t.lock() if async { t.maybeRunAsync() } oldPeriod := t.period t.period = period if f != nil { t.f = f t.arg = arg t.seq = seq } wake := false pending := t.when > 0 t.when = when if t.state&timerHeaped != 0 { t.state |= timerModified if t.state&timerZombie != 0 { // 在堆中但标记为删除(通过 Stop) // 取消标记,因为它已被 Reset 并将再次运行 t.ts.zombies.Add(-1) t.state &^= timerZombie } // 对应的 heap[i].when 稍后更新 if min := t.ts.minWhenModified.Load(); min == 0 || when < min { wake = true t.astate.Store(t.state) t.ts.updateMinWhenModified(when) } } add := t.needsAdd() if !async && t.isChan { t.seq++ if oldPeriod == 0 && t.isSending.Load() > 0 { pending = true } } t.unlock() if !async && t.isChan { if timerchandrain(t.hchan()) { pending = true } unlock(&t.sendLock) } if add { t.maybeAdd() } if wake { wakeNetPoller(when) } return pending }
sequenceDiagram
participant U as 用户代码
participant T as Timer
participant R as Runtime
participant P as P
U->>T: Reset(duration)
T->>R: resetTimer
R->>P: 获取当前 P
P->>P: 加锁
P->>P: 如果 timer 在堆中,标记为已删除
P->>P: 更新 when 时间
P->>P: 重新添加到堆
P->>P: 解锁
R->>U: 返回结果
Timer 堆的实现
最小堆特性
Go 的 timer 堆是一个最小堆,具有以下特性:
堆顶元素最小:堆顶的 timer 是最早触发的
完全二叉树:使用数组实现
堆序性质:父节点的 when 值小于等于子节点
graph TB
A[Timer1 when: 100 索引: 0] --> B[Timer2 when: 200 索引: 1]
A --> C[Timer3 when: 300 索引: 2]
B --> D[Timer4 when: 400 索引: 3]
B --> E[Timer5 when: 500 索引: 4]
C --> F[Timer6 when: 600 索引: 5]
C --> G[Timer7 when: 700 索引: 6]
style A fill:#51CF66
// siftDown 通过向下移动将位置 i 的 timer 放在堆的正确位置 func(ts *timers) siftDown(i int) { heap := ts.heap n := len(heap) if i >= n { badTimer() } if i*timerHeapN+1 >= n { return } tw := heap[i] if tw.when <= 0 { badTimer() } for { leftChild := i*timerHeapN + 1 if leftChild >= n { break } w := tw c := -1 for j, tw := range heap[leftChild:min(leftChild+timerHeapN, n)] { if tw.less(w) { w = tw c = leftChild + j } } if c < 0 { break } heap[i] = heap[c] i = c } if heap[i].timer != tw.timer { heap[i] = tw } }
四叉堆 vs 二叉堆
Go 使用四叉堆而不是二叉堆,原因:
减少堆高度:四叉堆的高度约为二叉堆的一半
减少比较次数:虽然每次比较的子节点更多,但总体比较次数更少
更好的缓存局部性:访问模式更友好
graph TB
A[二叉堆] --> A1[高度: log₂n]
A --> A2[每次比较 2 个子节点]
B[四叉堆] --> B1[高度: log₄n ≈ log₂n/2]
B --> B2[每次比较 4 个子节点]
style A fill:#FF6B6B
style B fill:#51CF66
Timer 的调度优化
Timer 调整机制
当 timer 被修改(Reset)或停止(Stop)时,Go 使用延迟调整机制来优化性能:
graph TB
A[Timer 被修改] --> B[设置 timerModified 位]
B --> C[更新 minWhenModified]
C --> D[等待 adjust 调用]
D --> E[批量处理所有修改的 timer]
E --> F[更新堆结构]
style A fill:#74C0FC
style E fill:#51CF66
graph TB
A[Netpoller] --> B[Linux: epoll]
A --> C[BSD/macOS: kqueue]
A --> D[Windows: IOCP]
B --> E[epoll_wait]
C --> F[kqueue]
D --> G[GetQueuedCompletionStatus]
style A fill:#51CF66
style B fill:#74C0FC
style C fill:#74C0FC
style D fill:#74C0FC
Timer 如何与 Netpoller 结合
1. 唤醒机制
当 timer 需要等待时,通过 wakeNetPoller() 通知 netpoller:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// runtime/time.go
// wakeNetPoller 唤醒 netpoller(如果它正在等待) // 如果 when 在将来,netpoller 会在 when 时间唤醒 funcwakeNetPoller(when int64) { if atomic.Load64(&sched.lastpoll) == 0 { // 如果 netpoller 未初始化,初始化它 netpollGenericInit() } // 计算需要等待的时间 delta := when - nanotime() if delta < 0 { delta = 0 } // 通知 netpoller 新的唤醒时间 netpollBreak() }
2. Netpoller 等待流程
Netpoller 在等待网络事件时,会同时等待 timer 触发:
sequenceDiagram
participant S as Scheduler
participant T as Timer
participant N as Netpoller
participant E as epoll/kqueue
participant O as OS
S->>N: 调用 netpoll
N->>T: 获取最小 timer 时间 (wakeTime)
T->>N: 返回 when 时间
N->>E: epoll_wait(timeout=when-now)
E->>O: 等待网络事件或超时
O->>E: 超时(timer 到期)或网络事件
E->>N: 返回事件
N->>S: 返回就绪的 goroutine
S->>S: 检查并触发到期 timer
funcnetpoll(delay int64) gList { if epfd == -1 { return gList{} } var events [128]epollevent var waitms int32 if delay < 0 { waitms = -1// 无限等待 } elseif delay == 0 { waitms = 0// 不等待 } else { waitms = int32(delay / 1e6) // 转换为毫秒 } // 调用 epoll_wait,等待网络事件或超时 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_EINTR { return gList{} } // 被中断,可能是 timer 唤醒 return gList{} } var toRun gList for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } // 处理网络事件 if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { // 这是 timer 唤醒信号 continue } // 唤醒等待网络 I/O 的 goroutine // ... } return toRun }
5. Timer 检查时机
Timer 在以下时机被检查:
graph TB
A[调度循环] --> B[findrunnable]
B --> C[checkTimers]
C --> D{有 timer 到期?}
D -->|是| E[触发 timer]
D -->|否| F[计算下次触发时间]
F --> G[netpoll]
G --> H{超时或网络事件?}
H -->|超时| I[Timer 到期]
H -->|网络事件| J[处理网络 I/O]
I --> C
J --> B
style E fill:#51CF66
style I fill:#51CF66
style J fill:#74C0FC
代码流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// runtime/proc.go
// checkTimers 检查并运行到期的 timer funccheckTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { if atomic.Load(&pp.deletedTimers) == 0 && atomic.Load(&pp.numTimers) == 0 { return now, 0, false } // 检查并运行到期的 timer rnow, pollUntil, ran = pp.timers.check(now, nil) return rnow, pollUntil, ran }
集成优势
1. 精确唤醒
操作系统内核会在精确时间唤醒 netpoller,无需轮询:
1 2
// 操作系统会在 when 时间精确唤醒 epoll_wait(epfd, events, maxevents, timeout) // timeout = when - now
sequenceDiagram
participant T as Timer 到期
participant N as Netpoller
participant S as Scheduler
participant G as Goroutine
T->>N: 超时唤醒
N->>S: 返回就绪事件
S->>G: 调度等待 timer 的 goroutine
G->>G: 继续执行
完整集成流程
flowchart TD
A[创建 Timer] --> B[添加到堆]
B --> C[计算 when 时间]
C --> D[wakeNetPoller]
D --> E[Netpoller 记录唤醒时间]
E --> F[调度循环]
F --> G[checkTimers]
G --> H{有 timer 到期?}
H -->|否| I[计算 pollUntil]
I --> J[netpoll]
J --> K[epoll_wait timeout=pollUntil]
K --> L{超时或事件?}
L -->|超时| M[Timer 到期]
L -->|网络事件| N[处理网络 I/O]
M --> O[触发 timer 回调]
N --> F
O --> F
H -->|是| O
style M fill:#51CF66
style O fill:#51CF66
style N fill:#74C0FC
代码示例
Timer 添加时唤醒 Netpoller
1 2 3 4 5 6 7 8 9 10 11 12
// runtime/time.go
func(t *timer) maybeAdd() { // ... 添加到堆 wakeTime := ts.wakeTime() wake := wakeTime == 0 || when < wakeTime if wake { wakeNetPoller(when) // 通知 netpoller 新的唤醒时间 } }
调度时检查 Timer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// runtime/proc.go
funcfindrunnable() (gp *g, inheritTime bool) { // 检查 timer now, pollUntil, ran := checkTimers(pp, 0) // 如果 timer 已运行,可能已经找到了可运行的 goroutine if ran { // 继续查找 } // 调用 netpoll,传入 timer 的等待时间 if netpollinited() && pollUntil != 0 { list := netpoll(pollUntil) // 处理网络事件和 timer 超时 } }
平台特定实现
不同平台使用不同的 I/O 多路复用机制:
平台
机制
系统调用
Linux
epoll
epoll_wait()
BSD/macOS
kqueue
kevent()
Windows
IOCP
GetQueuedCompletionStatusEx()
所有实现都支持传入超时时间,从而实现 timer 的精确唤醒。
总结
Timer 与 Netpoller 的集成实现了:
✅ 精确唤醒:操作系统内核在精确时间唤醒
✅ 低 CPU 占用:无需轮询,CPU 可用于其他工作
✅ 统一事件循环:Timer 和网络 I/O 共享同一事件循环
✅ 高效调度:Timer 到期时立即调度相关 goroutine
✅ 跨平台支持:使用各平台最优的 I/O 多路复用机制
这种设计使得 Go 的 timer 系统既精确又高效,能够处理大量并发 timer 而不会显著影响系统性能。
funcmonitorTimers() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { <-ticker.C var m runtime.MemStats runtime.ReadMemStats(&m) // 监控内存使用,间接反映 Timer 数量 log.Printf("NumGC: %d, Alloc: %d", m.NumGC, m.Alloc) } }
Timer 与 Goroutine 调度
Timer 触发时的调度
当 Timer 触发时,会唤醒等待的 goroutine:
sequenceDiagram
participant T as Timer
participant G as Goroutine
participant S as Scheduler
T->>T: 到期触发
T->>G: 发送到 channel
G->>S: 从阻塞状态唤醒
S->>G: 调度执行
G->>G: 继续执行
Timer 与 Select 语句
1 2 3 4 5 6 7 8
select { case <-timer.C: // Timer 触发 case <-otherChan: // 其他 channel 就绪 case <-ctx.Done(): // Context 取消 }
当多个 case 同时就绪时,Go 会随机选择一个,但 Timer 的 case 可能不会被选中(如果其他 case 也就绪)。