funcProducerConsumer(producers, consumers, taskCount, queueSize int) { ch := make(chanint, queueSize) var wg sync.WaitGroup
for i := 0; i < producers; i++ { wg.Add(1) gofunc(id int) { defer wg.Done() for j := 0; j < taskCount/producers; j++ { ch <- id*1000 + j } }(i) }
gofunc() { wg.Wait() close(ch) }()
for i := 0; i < consumers; i++ { gofunc(id int) { for v := range ch { fmt.Printf("consumer %d: %d\n", id, v) } }(i) }
time.Sleep(time.Second * 2) }
限制并发数(Worker Pool / 信号量)
题目:大量任务并发执行,但同一时刻最多 N 个在执行(限制并发数)。
解法一:带缓冲 channel 作信号量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
funcLimitConcurrency(tasks []func(), limit int) { sem := make(chanstruct{}, limit) var wg sync.WaitGroup
for _, t := range tasks { wg.Add(1) gofunc(fn func()) { defer wg.Done() sem <- struct{}{} deferfunc() { <-sem }() fn() }(t) }
wg.Wait() }
解法二:固定 worker 池
1 2 3 4 5 6 7 8 9 10 11 12 13
funcWorkerPool(tasks <-chanint, numWorkers int) { var wg sync.WaitGroup for i := 0; i < numWorkers; i++ { wg.Add(1) gofunc() { defer wg.Done() for t := range tasks { fmt.Println(t) } }() } wg.Wait() }
funcConcurrentSum(arr []int, numWorkers int)int { n := len(arr) if n == 0 { return0 } step := (n + numWorkers - 1) / numWorkers var wg sync.WaitGroup results := make(chanint, numWorkers)
for i := 0; i < numWorkers; i++ { start := i * step end := start + step if end > n { end = n } if start >= n { break } wg.Add(1) gofunc(slice []int) { defer wg.Done() sum := 0 for _, v := range slice { sum += v } results <- sum }(arr[start:end]) }
gofunc() { wg.Wait() close(results) }()
total := 0 for s := range results { total += s } return total }
funcConcurrentWordCount(texts []string)map[string]int { var mu sync.Mutex count := make(map[string]int) var wg sync.WaitGroup
for _, text := range texts { wg.Add(1) gofunc(s string) { defer wg.Done() words := strings.Fields(s) mu.Lock() for _, w := range words { count[w]++ } mu.Unlock() }(text) }