负载均衡(Load Balancing)是分布式系统中的重要技术,通过将请求分发到多个服务器实例上,实现流量分配、提高系统可用性和性能。本文详细介绍常见的负载均衡算法及其实现。
负载均衡概述 负载均衡算法将流量尽量平均地分配到每个服务器实例上,避免单个服务器过载,提高系统的整体性能和可用性。
graph TB
A[客户端请求] --> B[负载均衡器]
B --> C[服务器1]
B --> D[服务器2]
B --> E[服务器3]
B --> F[服务器N]
style B fill:#FFE66D
style C fill:#51CF66
style D fill:#51CF66
style E fill:#51CF66
style F fill:#51CF66
1. 随机算法(Random) 算法原理 随机算法是最简单的负载均衡算法,每次请求随机选择一个服务器处理。
算法特点
优点 :
实现简单
调用量越大分布越均匀
适合动态调整服务器权重
缺点 :
小样本下分布不均匀
不考虑服务器当前负载
可能将请求发送到性能差的服务器
Go 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mainimport ( "math/rand" "time" ) type Server struct { Address string Weight int } type RandomLoadBalancer struct { servers []*Server rng *rand.Rand } func NewRandomLoadBalancer (servers []*Server) *RandomLoadBalancer { return &RandomLoadBalancer{ servers: servers, rng: rand.New(rand.NewSource(time.Now().UnixNano())), } } func (lb *RandomLoadBalancer) Select() *Server { if len (lb.servers) == 0 { return nil } index := lb.rng.Intn(len (lb.servers)) return lb.servers[index] }
加权随机算法(Weight Random) 加权随机算法根据服务器的权重进行随机选择,权重高的服务器被选中的概率更大。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 type WeightRandomLoadBalancer struct { servers []*Server rng *rand.Rand totalWeight int } func NewWeightRandomLoadBalancer (servers []*Server) *WeightRandomLoadBalancer { totalWeight := 0 for _, s := range servers { totalWeight += s.Weight } return &WeightRandomLoadBalancer{ servers: servers, rng: rand.New(rand.NewSource(time.Now().UnixNano())), totalWeight: totalWeight, } } func (lb *WeightRandomLoadBalancer) Select() *Server { if len (lb.servers) == 0 || lb.totalWeight == 0 { return nil } randomWeight := lb.rng.Intn(lb.totalWeight) currentWeight := 0 for _, server := range lb.servers { currentWeight += server.Weight if randomWeight < currentWeight { return server } } return lb.servers[len (lb.servers)-1 ] }
使用场景
服务器性能相近
不需要会话保持
对分布均匀性要求不高
2. 轮询算法(Round Robin) 算法原理 轮询算法按顺序依次将请求分配给每个服务器,循环往复。
sequenceDiagram
participant C as 客户端
participant LB as 负载均衡器
participant S1 as 服务器1
participant S2 as 服务器2
participant S3 as 服务器3
C->>LB: 请求1
LB->>S1: 转发请求1
C->>LB: 请求2
LB->>S2: 转发请求2
C->>LB: 请求3
LB->>S3: 转发请求3
C->>LB: 请求4
LB->>S1: 转发请求4
算法特点
优点 :
缺点 :
不考虑服务器当前负载
慢服务器会累积请求
不适合服务器性能差异大的场景
Go 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type RoundRobinLoadBalancer struct { servers []*Server current int mutex sync.Mutex } func NewRoundRobinLoadBalancer (servers []*Server) *RoundRobinLoadBalancer { return &RoundRobinLoadBalancer{ servers: servers, current: 0 , } } func (lb *RoundRobinLoadBalancer) Select() *Server { lb.mutex.Lock() defer lb.mutex.Unlock() if len (lb.servers) == 0 { return nil } server := lb.servers[lb.current] lb.current = (lb.current + 1 ) % len (lb.servers) return server }
加权轮询算法(Weight Round Robin) 加权轮询算法根据服务器权重进行轮询,权重高的服务器获得更多请求。
平滑加权轮询实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 type WeightRoundRobinLoadBalancer struct { servers []*WeightServer mutex sync.Mutex } type WeightServer struct { Server *Server Weight int CurrentWeight int EffectiveWeight int } func NewWeightRoundRobinLoadBalancer (servers []*Server) *WeightRoundRobinLoadBalancer { weightServers := make ([]*WeightServer, len (servers)) for i, s := range servers { weightServers[i] = &WeightServer{ Server: s, Weight: s.Weight, CurrentWeight: 0 , EffectiveWeight: s.Weight, } } return &WeightRoundRobinLoadBalancer{ servers: weightServers, } } func (lb *WeightRoundRobinLoadBalancer) Select() *Server { lb.mutex.Lock() defer lb.mutex.Unlock() if len (lb.servers) == 0 { return nil } totalWeight := 0 var selected *WeightServer maxCurrentWeight := -1 for _, ws := range lb.servers { ws.CurrentWeight += ws.EffectiveWeight totalWeight += ws.EffectiveWeight if ws.CurrentWeight > maxCurrentWeight { maxCurrentWeight = ws.CurrentWeight selected = ws } } if selected != nil { selected.CurrentWeight -= totalWeight return selected.Server } return nil } func (lb *WeightRoundRobinLoadBalancer) AdjustWeight(server *Server, success bool ) { lb.mutex.Lock() defer lb.mutex.Unlock() for _, ws := range lb.servers { if ws.Server == server { if success { if ws.EffectiveWeight < ws.Weight { ws.EffectiveWeight++ } } else { if ws.EffectiveWeight > 0 { ws.EffectiveWeight-- } } break } } }
使用场景
3. 最少连接算法(Least Connections) 算法原理 最少连接算法选择当前连接数最少的服务器处理新请求。
graph LR
A[新请求] --> B{比较连接数}
B --> C[服务器1: 5个连接]
B --> D[服务器2: 2个连接]
B --> E[服务器3: 8个连接]
B --> F[选择服务器2]
style F fill:#51CF66
算法特点
优点 :
考虑服务器当前负载
慢服务器收到更少请求
适合长连接场景
缺点 :
需要维护连接数状态
实现相对复杂
需要考虑连接数统计的准确性
Go 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 type LeastConnectionsLoadBalancer struct { servers map [*Server]*ServerStats mutex sync.RWMutex } type ServerStats struct { Server *Server Connections int mutex sync.Mutex } func NewLeastConnectionsLoadBalancer (servers []*Server) *LeastConnectionsLoadBalancer { lb := &LeastConnectionsLoadBalancer{ servers: make (map [*Server]*ServerStats), } for _, s := range servers { lb.servers[s] = &ServerStats{ Server: s, Connections: 0 , } } return lb } func (lb *LeastConnectionsLoadBalancer) Select() *Server { lb.mutex.RLock() defer lb.mutex.RUnlock() if len (lb.servers) == 0 { return nil } var selected *Server minConnections := int (^uint (0 ) >> 1 ) for server, stats := range lb.servers { stats.mutex.Lock() connections := stats.Connections stats.mutex.Unlock() if connections < minConnections { minConnections = connections selected = server } } return selected } func (lb *LeastConnectionsLoadBalancer) IncrementConnections(server *Server) { lb.mutex.RLock() stats, exists := lb.servers[server] lb.mutex.RUnlock() if exists { stats.mutex.Lock() stats.Connections++ stats.mutex.Unlock() } } func (lb *LeastConnectionsLoadBalancer) DecrementConnections(server *Server) { lb.mutex.RLock() stats, exists := lb.servers[server] lb.mutex.RUnlock() if exists { stats.mutex.Lock() if stats.Connections > 0 { stats.Connections-- } stats.mutex.Unlock() } }
加权最少连接算法(Weight Least Connections) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 type WeightLeastConnectionsLoadBalancer struct { servers map [*Server]*WeightServerStats mutex sync.RWMutex } type WeightServerStats struct { Server *Server Weight int Connections int mutex sync.Mutex } func NewWeightLeastConnectionsLoadBalancer (servers []*Server) *WeightLeastConnectionsLoadBalancer { lb := &WeightLeastConnectionsLoadBalancer{ servers: make (map [*Server]*WeightServerStats), } for _, s := range servers { lb.servers[s] = &WeightServerStats{ Server: s, Weight: s.Weight, Connections: 0 , } } return lb } func (lb *WeightLeastConnectionsLoadBalancer) Select() *Server { lb.mutex.RLock() defer lb.mutex.RUnlock() if len (lb.servers) == 0 { return nil } var selected *Server minRatio := float64 (^uint (0 ) >> 1 ) for server, stats := range lb.servers { stats.mutex.Lock() connections := stats.Connections weight := stats.Weight stats.mutex.Unlock() if weight == 0 { continue } ratio := float64 (connections) / float64 (weight) if ratio < minRatio { minRatio = ratio selected = server } } return selected }
使用场景
长连接场景(如 WebSocket、TCP 长连接)
服务器处理时间差异较大
需要动态负载均衡
4. 哈希算法(Hash) 算法原理 哈希算法根据请求的某个特征(如 IP、URL、参数等)计算哈希值,将相同哈希值的请求路由到同一服务器。
算法特点
优点 :
相同请求总是路由到同一服务器
支持会话保持
实现简单
缺点 :
服务器增减时,大量请求需要重新路由
可能导致负载不均
哈希冲突可能影响分布
源 IP 哈希(Source IP Hash) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 type IPHashLoadBalancer struct { servers []*Server } func NewIPHashLoadBalancer (servers []*Server) *IPHashLoadBalancer { return &IPHashLoadBalancer{ servers: servers, } } func (lb *IPHashLoadBalancer) Select(clientIP string ) *Server { if len (lb.servers) == 0 { return nil } hash := lb.hash(clientIP) index := hash % len (lb.servers) return lb.servers[index] } func (lb *IPHashLoadBalancer) hash(key string ) int { hash := 0 for _, char := range key { hash = hash*31 + int (char) if hash < 0 { hash = -hash } } return hash } func (lb *IPHashLoadBalancer) fnvHash(key string ) uint32 { hash := uint32 (2166136261 ) for _, char := range key { hash ^= uint32 (char) hash *= 16777619 } return hash }
参数哈希(Parameter Hash) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 type ParameterHashLoadBalancer struct { servers []*Server } func NewParameterHashLoadBalancer (servers []*Server) *ParameterHashLoadBalancer { return &ParameterHashLoadBalancer{ servers: servers, } } func (lb *ParameterHashLoadBalancer) Select(params map [string ]string ) *Server { if len (lb.servers) == 0 { return nil } key := lb.serializeParams(params) hash := lb.hash(key) index := hash % len (lb.servers) return lb.servers[index] } func (lb *ParameterHashLoadBalancer) serializeParams(params map [string ]string ) string { var parts []string for k, v := range params { parts = append (parts, k+"=" +v) } sort.Strings(parts) return strings.Join(parts, "&" ) } func (lb *ParameterHashLoadBalancer) hash(key string ) int { hash := 0 for _, char := range key { hash = hash*31 + int (char) if hash < 0 { hash = -hash } } return hash }
使用场景
需要会话保持
需要缓存亲和性
相同请求需要路由到同一服务器
5. 一致性哈希算法(Consistent Hash) 算法原理 一致性哈希将服务器和请求都映射到一个哈希环上,请求被路由到顺时针方向最近的服务器。
graph TB
A[哈希环] --> B[服务器1 Hash: 100]
A --> C[服务器2 Hash: 200]
A --> D[服务器3 Hash: 300]
A --> E[请求1 Hash: 150]
A --> F[请求2 Hash: 250]
E -->|路由到| C
F -->|路由到| D
style E fill:#FFE66D
style F fill:#FFE66D
style C fill:#51CF66
style D fill:#51CF66
算法特点
优点 :
服务器增减时,只影响少量请求
负载相对均匀
支持虚拟节点,提高分布均匀性
缺点 :
实现相对复杂
可能出现负载不均(通过虚拟节点缓解)
Go 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 import ( "hash/crc32" "sort" "strconv" "sync" ) type ConsistentHashLoadBalancer struct { hashRing map [uint32 ]*Server sortedHashes []uint32 virtualNodes int mutex sync.RWMutex } func NewConsistentHashLoadBalancer (servers []*Server, virtualNodes int ) *ConsistentHashLoadBalancer { lb := &ConsistentHashLoadBalancer{ hashRing: make (map [uint32 ]*Server), sortedHashes: make ([]uint32 , 0 ), virtualNodes: virtualNodes, } for _, server := range servers { lb.AddServer(server) } return lb } func (lb *ConsistentHashLoadBalancer) AddServer(server *Server) { lb.mutex.Lock() defer lb.mutex.Unlock() for i := 0 ; i < lb.virtualNodes; i++ { virtualKey := server.Address + "#" + strconv.Itoa(i) hash := lb.hash(virtualKey) lb.hashRing[hash] = server lb.sortedHashes = append (lb.sortedHashes, hash) } sort.Slice(lb.sortedHashes, func (i, j int ) bool { return lb.sortedHashes[i] < lb.sortedHashes[j] }) } func (lb *ConsistentHashLoadBalancer) RemoveServer(server *Server) { lb.mutex.Lock() defer lb.mutex.Unlock() for i := 0 ; i < lb.virtualNodes; i++ { virtualKey := server.Address + "#" + strconv.Itoa(i) hash := lb.hash(virtualKey) delete (lb.hashRing, hash) index := sort.Search(len (lb.sortedHashes), func (j int ) bool { return lb.sortedHashes[j] >= hash }) if index < len (lb.sortedHashes) && lb.sortedHashes[index] == hash { lb.sortedHashes = append ( lb.sortedHashes[:index], lb.sortedHashes[index+1 :]..., ) } } } func (lb *ConsistentHashLoadBalancer) Select(key string ) *Server { lb.mutex.RLock() defer lb.mutex.RUnlock() if len (lb.hashRing) == 0 { return nil } hash := lb.hash(key) index := sort.Search(len (lb.sortedHashes), func (i int ) bool { return lb.sortedHashes[i] >= hash }) if index == len (lb.sortedHashes) { index = 0 } serverHash := lb.sortedHashes[index] return lb.hashRing[serverHash] } func (lb *ConsistentHashLoadBalancer) hash(key string ) uint32 { return crc32.ChecksumIEEE([]byte (key)) }
使用场景
分布式缓存系统
需要动态增减服务器
需要最小化重新路由的请求数
6. 最少活跃数算法(Least Active) 算法原理 最少活跃数算法选择当前正在处理的请求数最少的服务器。
算法特点
优点 :
考虑服务器当前处理能力
慢服务器收到更少请求
适合处理时间差异大的场景
缺点 :
Go 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 type LeastActiveLoadBalancer struct { servers map [*Server]*ActiveStats mutex sync.RWMutex } type ActiveStats struct { Server *Server ActiveCount int mutex sync.Mutex } func NewLeastActiveLoadBalancer (servers []*Server) *LeastActiveLoadBalancer { lb := &LeastActiveLoadBalancer{ servers: make (map [*Server]*ActiveStats), } for _, s := range servers { lb.servers[s] = &ActiveStats{ Server: s, ActiveCount: 0 , } } return lb } func (lb *LeastActiveLoadBalancer) Select() *Server { lb.mutex.RLock() defer lb.mutex.RUnlock() if len (lb.servers) == 0 { return nil } var selected *Server minActive := int (^uint (0 ) >> 1 ) for server, stats := range lb.servers { stats.mutex.Lock() activeCount := stats.ActiveCount stats.mutex.Unlock() if activeCount < minActive { minActive = activeCount selected = server } } return selected } func (lb *LeastActiveLoadBalancer) IncrementActive(server *Server) { lb.mutex.RLock() stats, exists := lb.servers[server] lb.mutex.RUnlock() if exists { stats.mutex.Lock() stats.ActiveCount++ stats.mutex.Unlock() } } func (lb *LeastActiveLoadBalancer) DecrementActive(server *Server) { lb.mutex.RLock() stats, exists := lb.servers[server] lb.mutex.RUnlock() if exists { stats.mutex.Lock() if stats.ActiveCount > 0 { stats.ActiveCount-- } stats.mutex.Unlock() } }
使用场景
服务器处理时间差异大
需要动态负载均衡
实时响应性要求高
7. 响应时间加权算法(Response Time Weighted) 算法原理 根据服务器的平均响应时间动态调整权重,响应时间短的服务器获得更多请求。
Go 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 type ResponseTimeWeightedLoadBalancer struct { servers map [*Server]*ResponseTimeStats mutex sync.RWMutex } type ResponseTimeStats struct { Server *Server BaseWeight int AvgResponseTime time.Duration RequestCount int64 mutex sync.Mutex } func NewResponseTimeWeightedLoadBalancer (servers []*Server) *ResponseTimeWeightedLoadBalancer { lb := &ResponseTimeWeightedLoadBalancer{ servers: make (map [*Server]*ResponseTimeStats), } for _, s := range servers { lb.servers[s] = &ResponseTimeStats{ Server: s, BaseWeight: s.Weight, AvgResponseTime: 100 * time.Millisecond, RequestCount: 0 , } } return lb } func (lb *ResponseTimeWeightedLoadBalancer) Select() *Server { lb.mutex.RLock() defer lb.mutex.RUnlock() if len (lb.servers) == 0 { return nil } totalWeight := 0 for _, stats := range lb.servers { stats.mutex.Lock() weight := lb.calculateWeight(stats) stats.mutex.Unlock() totalWeight += weight } if totalWeight == 0 { var servers []*Server for s := range lb.servers { servers = append (servers, s) } return servers[rand.Intn(len (servers))] } randomWeight := rand.Intn(totalWeight) currentWeight := 0 for server, stats := range lb.servers { stats.mutex.Lock() weight := lb.calculateWeight(stats) stats.mutex.Unlock() currentWeight += weight if randomWeight < currentWeight { return server } } return nil } func (lb *ResponseTimeWeightedLoadBalancer) calculateWeight(stats *ResponseTimeStats) int { baseResponseTime := 100 * time.Millisecond if stats.AvgResponseTime <= 0 { return stats.BaseWeight } ratio := float64 (baseResponseTime) / float64 (stats.AvgResponseTime) weight := int (float64 (stats.BaseWeight) * ratio) if weight < 1 { weight = 1 } return weight } func (lb *ResponseTimeWeightedLoadBalancer) RecordResponseTime( server *Server, responseTime time.Duration, ) { lb.mutex.RLock() stats, exists := lb.servers[server] lb.mutex.RUnlock() if !exists { return } stats.mutex.Lock() defer stats.mutex.Unlock() alpha := 0.3 if stats.RequestCount == 0 { stats.AvgResponseTime = responseTime } else { stats.AvgResponseTime = time.Duration( float64 (stats.AvgResponseTime)*alpha + float64 (responseTime)*(1 -alpha), ) } stats.RequestCount++ }
使用场景
服务器性能差异大
需要根据实时性能调整
对响应时间敏感
算法对比
算法
实现复杂度
负载均衡性
会话保持
动态调整
适用场景
随机
⭐
⭐⭐⭐
❌
⭐⭐
简单场景
加权随机
⭐⭐
⭐⭐⭐
❌
⭐⭐⭐
性能差异大
轮询
⭐
⭐⭐⭐
❌
⭐
性能相近
加权轮询
⭐⭐
⭐⭐⭐
❌
⭐⭐
性能差异大
最少连接
⭐⭐⭐
⭐⭐⭐
❌
⭐⭐⭐
长连接
哈希
⭐⭐
⭐⭐
✅
❌
会话保持
一致性哈希
⭐⭐⭐
⭐⭐⭐
✅
⭐⭐⭐
分布式缓存
最少活跃数
⭐⭐⭐
⭐⭐⭐
❌
⭐⭐⭐
处理时间差异大
响应时间加权
⭐⭐⭐⭐
⭐⭐⭐
❌
⭐⭐⭐⭐
性能敏感
选择建议
服务器性能相近 :使用轮询或随机算法
服务器性能差异大 :使用加权轮询或加权随机
需要会话保持 :使用哈希或一致性哈希
长连接场景 :使用最少连接算法
处理时间差异大 :使用最少活跃数算法
动态性能调整 :使用响应时间加权算法
分布式缓存 :使用一致性哈希算法
最佳实践
健康检查 :定期检查服务器健康状态,移除不健康的服务器
权重调整 :根据服务器性能动态调整权重
监控指标 :监控各算法的负载分布和性能指标
降级策略 :当所有服务器不可用时,提供降级方案
预热机制 :新服务器加入时,逐渐增加权重,避免突然负载过高
总结 负载均衡算法是分布式系统的核心组件,选择合适的算法对系统性能至关重要。需要根据实际场景(服务器性能、会话要求、连接类型等)选择最合适的算法,并在实际使用中根据监控数据不断优化调整。
参考文献