推荐阅读:Go Channel Source Code
Channel 概述 Channel 是 Go 语言中用于 goroutine 之间通信的机制,是 Go 语言并发编程的核心组件之一。Channel 提供了一种类型安全、线程安全的方式来传递数据和同步执行。
设计理念 Channel 遵循 CSP(Communicating Sequential Processes)模型,核心思想是:“不要通过共享内存来通信,而要通过通信来共享内存” 。
graph LR
A[Goroutine 1] -->|发送数据| B[Channel]
B -->|接收数据| C[Goroutine 2]
style A fill:#ccffcc
style B fill:#ffffcc
style C fill:#ccffcc
Channel 的特点
✅ 类型安全 :编译时检查类型
✅ 线程安全 :多个 goroutine 可以安全地并发访问
✅ 阻塞机制 :自动处理阻塞和唤醒
✅ 同步原语 :可以用于同步多个 goroutine
基本用法 Channel 的定义和创建 声明 Channel 1 2 3 4 5 6 var ch chan int ch := make (chan int ) ch := make (chan int , 10 )
无缓冲 Channel 无缓冲 channel 的容量为 0,发送和接收操作必须同时准备好,否则会阻塞。
1 2 3 4 5 6 7 8 9 10 11 func main () { ch := make (chan int ) go func () { ch <- 42 fmt.Println("数据已发送" ) }() value := <-ch fmt.Println("接收到数据:" , value) }
有缓冲 Channel 有缓冲 channel 有一个固定大小的缓冲区,只有当缓冲区满时发送才会阻塞,只有当缓冲区空时接收才会阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func main () { ch := make (chan int , 3 ) ch <- 1 ch <- 2 ch <- 3 fmt.Println(<-ch) fmt.Println(<-ch) fmt.Println(<-ch) }
发送和接收操作 基本语法 1 2 3 ch <- value value := <-ch <-ch
发送操作 1 2 3 4 5 6 7 8 ch := make (chan int ) ch <- 42
接收操作 1 2 3 4 5 6 7 8 9 10 11 12 13 ch := make (chan int ) value := <-ch value, ok := <-ch if !ok { fmt.Println("channel 已关闭" ) } <-ch
Channel 的方向 Channel 可以是单向的(只发送或只接收),用于限制函数的行为。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func sendOnly (ch chan <- int ) { ch <- 42 } func receiveOnly (ch <-chan int ) { value := <-ch } func main () { ch := make (chan int ) go sendOnly(ch) go receiveOnly(ch) }
Channel 的关闭 关闭 Channel 使用 close() 函数关闭 channel。关闭 channel 后,不能再向 channel 发送数据,但可以继续接收已发送的数据。
1 2 3 4 5 6 7 8 9 ch := make (chan int , 3 ) ch <- 1 ch <- 2 ch <- 3 close (ch)
检测 Channel 是否关闭 接收操作可以返回两个值:接收到的值和 channel 是否已关闭的布尔值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ch := make (chan int ) go func () { ch <- 1 ch <- 2 close (ch) }() for { value, ok := <-ch if !ok { fmt.Println("channel 已关闭" ) break } fmt.Println("接收到:" , value) }
关闭 Channel 的原则
只有发送者应该关闭 channel ,接收者不应该关闭
不要关闭已关闭的 channel ,会导致 panic
关闭 nil channel 会导致 panic
1 2 3 4 5 6 7 8 9 10 11 12 13 func producer (ch chan <- int ) { defer close (ch) for i := 0 ; i < 10 ; i++ { ch <- i } } func consumer (ch <-chan int ) { for value := range ch { fmt.Println(value) } }
Select 语句 Select 基本用法 select 语句用于在多个 channel 操作中选择一个执行,类似于 switch,但专门用于 channel。
1 2 3 4 5 6 7 8 9 10 select {case ch1 <- value1: case value2 := <-ch2: case <-time.After(time.Second): default : }
Select 的阻塞行为 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 select {case ch1 <- 1 : fmt.Println("发送到 ch1" ) case value := <-ch2: fmt.Println("从 ch2 接收:" , value) } select {case ch <- 1 : fmt.Println("发送成功" ) default : fmt.Println("发送失败,channel 已满" ) }
Select 的常见模式 1. 超时控制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func main () { ch := make (chan string ) go func () { time.Sleep(2 * time.Second) ch <- "结果" }() select { case result := <-ch: fmt.Println("收到结果:" , result) case <-time.After(1 * time.Second): fmt.Println("超时" ) } }
2. 取消操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func main () { done := make (chan struct {}) go func () { for { select { case <-done: fmt.Println("操作被取消" ) return default : time.Sleep(100 * time.Millisecond) } } }() time.Sleep(500 * time.Millisecond) close (done) time.Sleep(100 * time.Millisecond) }
3. 多路复用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func main () { ch1 := make (chan int ) ch2 := make (chan int ) go func () { for i := 0 ; i < 5 ; i++ { ch1 <- i time.Sleep(100 * time.Millisecond) } close (ch1) }() go func () { for i := 10 ; i < 15 ; i++ { ch2 <- i time.Sleep(150 * time.Millisecond) } close (ch2) }() for { select { case v, ok := <-ch1: if !ok { ch1 = nil } else { fmt.Println("从 ch1 接收:" , v) } case v, ok := <-ch2: if !ok { ch2 = nil } else { fmt.Println("从 ch2 接收:" , v) } } if ch1 == nil && ch2 == nil { break } } }
Range 遍历 Channel 使用 range 可以遍历 channel,当 channel 关闭时,循环会自动结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func main () { ch := make (chan int ) go func () { for i := 0 ; i < 5 ; i++ { ch <- i } close (ch) }() for value := range ch { fmt.Println(value) } }
Range 的等价写法 1 2 3 4 5 6 7 8 9 10 11 12 13 for value := range ch { fmt.Println(value) } for { value, ok := <-ch if !ok { break } fmt.Println(value) }
Channel 的底层实现 hchan 结构体 Channel 的底层实现是 hchan 结构体,定义在 runtime/chan.go 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq lock mutex }
关键字段说明
buf :有缓冲 channel 的环形队列
qcount :队列中元素数量
dataqsiz :队列容量
sendx/recvx :发送/接收索引
recvq/sendq :等待接收/发送的 goroutine 队列
lock :保护 channel 的互斥锁
Channel 的内存布局 graph TB
A[hchan 结构体] --> B[环形缓冲区 buf]
A --> C[接收等待队列 recvq]
A --> D[发送等待队列 sendq]
A --> E[互斥锁 lock]
B --> F[元素 0]
B --> G[元素 1]
B --> H[元素 ...]
B --> I[元素 n-1]
style A fill:#ffcccc
style B fill:#ccffcc
style C fill:#ffffcc
style D fill:#ffffcc
发送操作流程 sequenceDiagram
participant G as Goroutine
participant C as Channel
participant Q as 等待队列
G->>C: ch <- value
alt 有等待的接收者
C->>Q: 直接传递给接收者
Q->>G: 唤醒接收者
else 缓冲区有空间
C->>C: 写入缓冲区
G->>G: 继续执行
else 缓冲区已满
G->>Q: 加入发送等待队列
G->>G: 阻塞等待
end
接收操作流程 sequenceDiagram
participant G as Goroutine
participant C as Channel
participant Q as 等待队列
G->>C: <-ch
alt 有等待的发送者
C->>Q: 直接从发送者接收
Q->>G: 唤醒发送者
else 缓冲区有数据
C->>C: 从缓冲区读取
G->>G: 继续执行
else 缓冲区为空
G->>Q: 加入接收等待队列
G->>G: 阻塞等待
end
源码分析 发送操作(chansend) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 func chansend (c *hchan, ep unsafe.Pointer, block bool , callerpc uintptr ) bool { if c == nil { if !block { return false } gopark(...) } if !block && c.closed == 0 && full(c) { return false } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic ("send on closed channel" ) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.elem = ep mysg.waitlink = nil mysg.g = gp c.sendq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2 ) }
接收操作(chanrecv) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 func chanrecv (c *hchan, ep unsafe.Pointer, block bool ) (selected, received bool ) { if c == nil { if !block { return } gopark(...) } if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true , true } if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true , true } if !block { unlock(&c.lock) return false , false } gp := getg() mysg := acquireSudog() mysg.elem = ep mysg.waitlink = nil mysg.g = gp c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2 ) }
Channel 的阻塞和非阻塞行为 阻塞行为 无缓冲 Channel 1 2 3 4 5 6 ch := make (chan int ) ch <- 1 value := <-ch
有缓冲 Channel 1 2 3 4 5 6 7 8 9 ch := make (chan int , 2 ) ch <- 1 ch <- 2 ch <- 3 value := <-ch value = <-ch value = <-ch
非阻塞操作 使用 select 的 default 分支可以实现非阻塞操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 select {case ch <- value: fmt.Println("发送成功" ) default : fmt.Println("发送失败,channel 已满" ) } select {case value := <-ch: fmt.Println("接收成功:" , value) default : fmt.Println("接收失败,channel 为空" ) }
Nil Channel 对 nil channel 的操作会永远阻塞。
1 2 3 4 5 var ch chan int ch <- 1 value := <-ch
Channel 的常见模式 1. 生产者-消费者模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func producer (ch chan <- int ) { defer close (ch) for i := 0 ; i < 10 ; i++ { ch <- i time.Sleep(100 * time.Millisecond) } } func consumer (ch <-chan int ) { for value := range ch { fmt.Println("消费:" , value) } } func main () { ch := make (chan int , 5 ) go producer(ch) consumer(ch) }
2. 工作池模式(Worker Pool) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func worker (id int , jobs <-chan int , results chan <- int ) { for job := range jobs { fmt.Printf("Worker %d 处理任务 %d\n" , id, job) time.Sleep(time.Second) results <- job * 2 } } func main () { jobs := make (chan int , 100 ) results := make (chan int , 100 ) for w := 1 ; w <= 3 ; w++ { go worker(w, jobs, results) } for j := 1 ; j <= 5 ; j++ { jobs <- j } close (jobs) for r := 1 ; r <= 5 ; r++ { fmt.Println("结果:" , <-results) } }
3. 扇入模式(Fan-In) 将多个 channel 的数据合并到一个 channel。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func fanIn (input1, input2 <-chan string ) <-chan string { output := make (chan string ) go func () { defer close (output) for { select { case msg := <-input1: output <- msg case msg := <-input2: output <- msg } } }() return output } func main () { ch1 := make (chan string ) ch2 := make (chan string ) go func () { for i := 0 ; i < 5 ; i++ { ch1 <- fmt.Sprintf("ch1-%d" , i) time.Sleep(100 * time.Millisecond) } close (ch1) }() go func () { for i := 0 ; i < 5 ; i++ { ch2 <- fmt.Sprintf("ch2-%d" , i) time.Sleep(150 * time.Millisecond) } close (ch2) }() for msg := range fanIn(ch1, ch2) { fmt.Println(msg) } }
4. 扇出模式(Fan-Out) 将一个 channel 的数据分发到多个 channel。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func fanOut (input <-chan int , outputs []chan <- int ) { defer func () { for _, ch := range outputs { close (ch) } }() for value := range input { for _, ch := range outputs { ch <- value } } } func main () { input := make (chan int ) outputs := make ([]chan int , 3 ) for i := range outputs { outputs[i] = make (chan int ) go func (id int , ch <-chan int ) { for value := range ch { fmt.Printf("Worker %d 收到: %d\n" , id, value) } }(i, outputs[i]) } go func () { for i := 0 ; i < 10 ; i++ { input <- i } close (input) }() fanOut(input, outputs) time.Sleep(time.Second) }
5. 管道模式(Pipeline) 将多个处理阶段串联起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func generate (nums ...int ) <-chan int { out := make (chan int ) go func () { for _, n := range nums { out <- n } close (out) }() return out } func square (in <-chan int ) <-chan int { out := make (chan int ) go func () { for n := range in { out <- n * n } close (out) }() return out } func print (in <-chan int ) { for n := range in { fmt.Println(n) } } func main () { numbers := generate(2 , 3 , 4 ) squared := square(numbers) print (squared) }
6. 超时模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func doWork () <-chan string { result := make (chan string ) go func () { defer close (result) time.Sleep(2 * time.Second) result <- "工作完成" }() return result } func main () { select { case result := <-doWork(): fmt.Println(result) case <-time.After(1 * time.Second): fmt.Println("超时" ) } }
7. 取消模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func worker (done <-chan struct {}) <-chan int { result := make (chan int ) go func () { defer close (result) for i := 0 ; ; i++ { select { case <-done: return case result <- i: time.Sleep(100 * time.Millisecond) } } }() return result } func main () { done := make (chan struct {}) result := worker(done) for i := 0 ; i < 5 ; i++ { fmt.Println(<-result) } close (done) time.Sleep(time.Second) }
Channel 与 Goroutine 的配合 Goroutine 泄漏 未关闭的 channel 可能导致 goroutine 泄漏。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func leak () { ch := make (chan int ) go func () { ch <- 1 }() } func noLeak () { ch := make (chan int ) done := make (chan struct {}) go func () { defer close (ch) for { select { case <-done: return case ch <- 1 : time.Sleep(time.Second) } } }() close (done) time.Sleep(100 * time.Millisecond) }
等待多个 Goroutine 完成 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func main () { done := make (chan struct {}) for i := 0 ; i < 10 ; i++ { go func (id int ) { defer func () { done <- struct {}{} }() fmt.Printf("Goroutine %d 完成\n" , id) }(i) } for i := 0 ; i < 10 ; i++ { <-done } fmt.Println("所有 goroutine 完成" ) }
使用 sync.WaitGroup 替代 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import "sync" func main () { var wg sync.WaitGroup for i := 0 ; i < 10 ; i++ { wg.Add(1 ) go func (id int ) { defer wg.Done() fmt.Printf("Goroutine %d 完成\n" , id) }(i) } wg.Wait() fmt.Println("所有 goroutine 完成" ) }
Channel 的性能考虑 性能特点
无缓冲 channel :同步通信,延迟低,但可能阻塞
有缓冲 channel :异步通信,吞吐量高,但占用内存
Channel 操作有锁开销 :每次操作都需要获取锁
性能测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func BenchmarkUnbufferedChannel (b *testing.B) { ch := make (chan int ) go func () { for i := 0 ; i < b.N; i++ { ch <- i } close (ch) }() for range ch { } } func BenchmarkBufferedChannel (b *testing.B) { ch := make (chan int , 100 ) go func () { for i := 0 ; i < b.N; i++ { ch <- i } close (ch) }() for range ch { } }
优化建议
合理选择缓冲区大小 :根据实际场景调整
避免频繁创建 channel :复用 channel
使用 sync 包替代简单同步 :对于简单的同步,sync.WaitGroup 等可能更高效
批量处理 :减少 channel 操作次数
最佳实践 1. 关闭 Channel 的原则
✅ 只有发送者应该关闭 channel
✅ 使用 defer close(ch) 确保关闭
✅ 接收者使用 range 或检查 ok 值
1 2 3 4 5 6 func producer (ch chan <- int ) { defer close (ch) for i := 0 ; i < 10 ; i++ { ch <- i } }
2. 避免 Goroutine 泄漏
✅ 确保所有启动的 goroutine 都能退出
✅ 使用 done channel 或 context 控制 goroutine 生命周期
✅ 及时关闭不再使用的 channel
3. 合理使用缓冲
✅ 无缓冲 channel:用于同步
✅ 有缓冲 channel:用于异步通信,提高吞吐量
✅ 缓冲区大小:根据实际场景调整,不要过大
4. 使用 Select 处理多个 Channel
✅ 使用 select 处理多个 channel
✅ 使用 nil channel 禁用 case
✅ 使用 default 实现非阻塞操作
5. 错误处理 1 2 3 4 5 6 7 8 func safeSend (ch chan <- int , value int ) error { select { case ch <- value: return nil case <-time.After(time.Second): return fmt.Errorf("发送超时" ) } }
常见问题和陷阱 1. 向已关闭的 Channel 发送数据 1 2 3 ch := make (chan int ) close (ch)ch <- 1
2. 关闭已关闭的 Channel 1 2 3 ch := make (chan int ) close (ch)close (ch)
3. 从 Nil Channel 接收数据 1 2 var ch chan int value := <-ch
4. Range 遍历未关闭的 Channel 1 2 3 4 5 6 7 8 9 10 11 12 ch := make (chan int ) go func () { for i := 0 ; i < 5 ; i++ { ch <- i } }() for value := range ch { fmt.Println(value) }
5. 死锁 1 2 3 4 5 6 func main () { ch := make (chan int ) ch <- 1 value := <-ch }
6. 竞态条件 虽然 channel 本身是线程安全的,但使用方式不当可能导致竞态条件。
1 2 3 4 5 6 7 8 9 10 11 12 13 var count int ch := make (chan int ) go func () { for i := 0 ; i < 1000 ; i++ { ch <- 1 } }() for i := 0 ; i < 1000 ; i++ { count += <-ch }
总结 Channel 是 Go 语言并发编程的核心组件,提供了类型安全、线程安全的通信机制。
关键要点
Channel 类型 :无缓冲(同步)和有缓冲(异步)
操作 :发送(<-)、接收(<-)、关闭(close)
Select :处理多个 channel 操作
Range :遍历 channel,自动检测关闭
底层实现 :hchan 结构体,使用锁和等待队列
常见模式 :生产者-消费者、工作池、扇入扇出、管道等
使用建议
✅ 遵循 CSP 模型:通过通信共享内存
✅ 合理选择缓冲大小
✅ 确保正确关闭 channel
✅ 避免 goroutine 泄漏
✅ 使用 select 处理多个 channel
参考资源