Redis订阅发布


Redis 的发布/订阅(Pub/Sub)是一种消息通信模式,允许消息的发送者(发布者)将消息发送到频道,而不需要知道接收者(订阅者)的具体信息。订阅者可以订阅一个或多个频道,并接收发布到这些频道的消息。

基本概念

Redis Pub/Sub 是一种”发布-订阅”消息模式,包含以下核心概念:

  • 发布者(Publisher):向频道发送消息的客户端
  • 订阅者(Subscriber):订阅频道并接收消息的客户端
  • 频道(Channel):消息传递的通道

核心命令

1. PUBLISH - 发布消息

向指定频道发布消息,返回接收到该消息的订阅者数量。

1
PUBLISH channel message

示例

1
2
PUBLISH news "Hello World"
# 返回接收到消息的订阅者数量

2. SUBSCRIBE - 订阅频道

订阅一个或多个频道,只接收这些确切频道上的消息。

1
SUBSCRIBE channel [channel ...]

示例

1
SUBSCRIBE news sports

3. UNSUBSCRIBE - 取消订阅

取消订阅指定频道,如果不指定频道则取消所有订阅。

1
UNSUBSCRIBE [channel ...]

4. PSUBSCRIBE - 模式订阅

使用模式(glob 风格)订阅所有匹配的频道。

1
PSUBSCRIBE pattern [pattern ...]

支持的匹配模式

  • *:匹配任意字符
  • ?:匹配单个字符
  • [ae]:匹配指定字符中的一个
  • \*:转义字符

示例

1
2
PSUBSCRIBE news.*
# 会匹配 news.sports、news.tech、news.us.ny 等

5. PUNSUBSCRIBE - 取消模式订阅

取消订阅指定的模式。

1
PUNSUBSCRIBE [pattern ...]

6. PUBSUB - 查看订阅信息

查看 Pub/Sub 系统的状态信息。

1
2
3
4
5
6
7
8
# 查看指定频道的订阅者数量
PUBSUB NUMSUB channel [channel ...]

# 查看匹配模式的订阅者数量
PUBSUB NUMPAT

# 查看活跃频道列表(至少有一个订阅者)
PUBSUB CHANNELS [pattern]

使用场景

1. 实时聊天系统

用户加入聊天室,订阅对应频道,发送消息时使用 PUBLISH

1
2
3
4
5
# 用户订阅聊天室
SUBSCRIBE room:12345

# 发送消息到聊天室
PUBLISH room:12345 "Hello everyone!"

2. 实时通知系统

系统有多个事件类型,用户可以订阅自己关心的事件。

1
2
3
4
5
# 订阅订单相关事件
SUBSCRIBE order.created order.updated order.cancelled

# 或者使用模式订阅所有订单事件
PSUBSCRIBE order.*

3. 日志收集与监控

多个服务按服务名或模块命名频道,统一处理所有服务的日志。

1
2
3
4
5
# 订阅所有服务的日志
PSUBSCRIBE *.log

# 订阅特定服务的日志
PSUBSCRIBE serviceA.*

4. 分布式任务广播

主节点向从节点广播命令或任务。

1
2
3
4
5
# 节点订阅命令频道
SUBSCRIBE node:commands

# 主节点发布命令
PUBLISH node:commands "update_config"

5. 系统事件通知

系统状态变化时通知相关组件。

1
2
3
4
5
6
# 订阅系统事件
PSUBSCRIBE system.*

# 发布系统事件
PUBLISH system.startup "Server started"
PUBLISH system.shutdown "Server shutting down"

代码示例

Go 示例

使用 go-redis 库实现订阅发布:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

func subscriber() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

pubsub := rdb.Subscribe(ctx, "news", "sports")
defer pubsub.Close()

// 模式订阅
patternPubsub := rdb.PSubscribe(ctx, "order.*")
defer patternPubsub.Close()

fmt.Println("订阅者已启动,等待消息...")

ch := pubsub.Channel()
patternCh := patternPubsub.Channel()

for {
select {
case msg := <-ch:
fmt.Printf("[频道 %s] 收到消息: %s\n", msg.Channel, msg.Payload)
case msg := <-patternCh:
fmt.Printf("[模式 %s 匹配频道 %s] 收到消息: %s\n",
msg.Pattern, msg.Channel, msg.Payload)
}
}
}

func publisher() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

time.Sleep(1 * time.Second) // 等待订阅者启动

rdb.Publish(ctx, "news", "Breaking news: Redis 7.0 released!")
rdb.Publish(ctx, "sports", "Game result: Team A wins!")
rdb.Publish(ctx, "order.created", "Order #12345 created")
rdb.Publish(ctx, "order.updated", "Order #12345 updated")

fmt.Println("消息已发布")
}

func main() {
go subscriber()
publisher()
time.Sleep(2 * time.Second)
}

注意事项与限制

1. 消息不持久化

Pub/Sub 消息不会持久化到磁盘。如果订阅者在消息发布时不在线,它将不会收到那条消息。如果需要持久化、历史记录或可靠性保证,应该考虑使用 Redis Streams

2. 交付语义

Redis Pub/Sub 提供的是”最多一次”(at-most-once)交付语义:

  • 消息发布后不会重试
  • 不保证被所有订阅者接收
  • 如果订阅者处理消息失败,消息会丢失

3. 订阅状态限制

客户端在使用 SUBSCRIBEPSUBSCRIBE 后会进入订阅状态,此时只能执行以下命令:

  • SUBSCRIBEPSUBSCRIBE
  • UNSUBSCRIBEPUNSUBSCRIBE
  • PINGQUIT
  • RESET(RESP3 协议)

其他命令会被拒绝执行。

4. 频道与数据库无关

Pub/Sub 与 Redis 的数据库编号(DB)无关。连接在不同 DB 上的订阅者也能接收到消息,因为 Pub/Sub 是全局的。

5. 性能考虑

  • PSUBSCRIBE 会对所有匹配模式的频道进行检查
  • 模式越宽泛(如 *),匹配的频道越多,开销越大
  • 生产环境中要避免使用过于宽泛的模式,以免匹配过多无关频道

6. 连接断开处理

订阅者需要处理网络断开的情况:

  • 实现自动重连机制
  • 在重连后重新订阅频道
  • 考虑使用 Redis Streams 作为更可靠的替代方案

Redis Streams

Redis Streams 是 Redis 5.0 引入的数据结构,提供了更强大的消息队列功能。与 Pub/Sub 相比,Streams 支持消息持久化、消费者组、消息确认等特性,更适合需要可靠消息传递的场景。

Streams 基本概念

  • Stream:一个只追加的日志数据结构,类似于日志文件
  • Entry(条目):Stream 中的一条消息,包含唯一的 ID 和键值对数据
  • Consumer Group(消费者组):多个消费者协同处理消息的机制
  • Consumer(消费者):消费者组中的单个消费者实例
  • Pending Entry(待处理条目):已读取但未确认的消息

Streams 核心命令

1. XADD - 添加消息

向 Stream 添加新消息,返回消息 ID。

1
XADD stream [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

示例

1
2
3
4
5
6
7
8
# 添加消息,* 表示自动生成 ID
XADD mystream * name "Alice" age 30

# 指定消息 ID
XADD mystream 1000-0 name "Bob" age 25

# 限制 Stream 长度
XADD mystream MAXLEN ~ 1000 * name "Charlie" age 35

2. XREAD - 读取消息

从 Stream 读取消息,支持阻塞模式。

1
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

示例

1
2
3
4
5
6
7
8
# 读取所有新消息
XREAD STREAMS mystream 0

# 阻塞读取,最多等待 1000ms
XREAD BLOCK 1000 STREAMS mystream $

# 从多个 Stream 读取
XREAD STREAMS stream1 stream2 0 0

3. XGROUP - 消费者组管理

创建、管理消费者组。

1
2
3
4
5
6
7
8
9
10
11
# 创建消费者组
XGROUP CREATE stream groupname id|$ [MKSTREAM] [ENTRIESREAD entries-read]

# 删除消费者组
XGROUP DESTROY stream groupname

# 删除消费者
XGROUP DELCONSUMER stream groupname consumername

# 设置消费者组的最后 ID
XGROUP SETID stream groupname id|$

示例

1
2
3
4
5
# 从 Stream 开头创建消费者组
XGROUP CREATE mystream mygroup 0

# 从最新消息开始创建消费者组
XGROUP CREATE mystream mygroup $ MKSTREAM

4. XREADGROUP - 消费者组读取

消费者组中的消费者读取消息。

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

示例

1
2
3
4
5
# 消费者读取未处理的消息
XREADGROUP GROUP mygroup consumer1 STREAMS mystream >

# 读取特定 ID 的消息
XREADGROUP GROUP mygroup consumer1 STREAMS mystream 0

5. XACK - 确认消息

确认消息已被处理,从 Pending 列表中移除。

1
XACK stream group id [id ...]

示例

1
XACK mystream mygroup 1609459200000-0

6. XPENDING - 查看待处理消息

查看消费者组中的待处理消息。

1
XPENDING stream group [start] [end] [count] [consumer]

示例

1
2
3
4
5
# 查看所有待处理消息
XPENDING mystream mygroup

# 查看特定消费者的待处理消息
XPENDING mystream mygroup - + 10 consumer1

7. XCLAIM - 认领消息

将待处理消息重新分配给其他消费者。

1
XCLAIM stream group consumer min-idle-time id [id ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]

8. XRANGE / XREVRANGE - 范围查询

按 ID 范围查询消息。

1
2
3
4
5
# 正序查询
XRANGE stream start end [COUNT count]

# 倒序查询
XREVRANGE stream end start [COUNT count]

示例

1
2
3
4
5
# 查询所有消息
XRANGE mystream - +

# 查询指定范围的消息
XRANGE mystream 1609459200000-0 1609459300000-0

9. XDEL - 删除消息

删除 Stream 中的消息。

1
XDEL stream id [id ...]

10. XLEN - 获取长度

获取 Stream 中的消息数量。

1
XLEN stream

Streams Go 代码示例

基础示例:生产者与消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

// 生产者:向 Stream 添加消息
func producer() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

for i := 0; i < 10; i++ {
// 添加消息,使用 * 自动生成 ID
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream",
Values: map[string]interface{}{
"message": fmt.Sprintf("Message %d", i),
"timestamp": time.Now().Unix(),
},
}).Result()

if err != nil {
fmt.Printf("发布消息失败: %v\n", err)
continue
}

fmt.Printf("消息已发布,ID: %s\n", id)
time.Sleep(500 * time.Millisecond)
}
}

// 消费者:从 Stream 读取消息
func consumer() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

// 从 Stream 开头读取
lastID := "0"

for {
// 阻塞读取,最多等待 1 秒
streams, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"mystream", lastID},
Block: time.Second,
Count: 10,
}).Result()

if err != nil {
if err == redis.Nil {
continue
}
fmt.Printf("读取消息失败: %v\n", err)
continue
}

for _, stream := range streams {
for _, message := range stream.Messages {
fmt.Printf("收到消息 ID: %s, 内容: %v\n",
message.ID, message.Values)
lastID = message.ID
}
}
}
}

func main() {
go consumer()
time.Sleep(1 * time.Second)
producer()
time.Sleep(2 * time.Second)
}

消费者组示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package main

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

// 创建消费者组
func createConsumerGroup(rdb *redis.Client, stream, group string) error {
ctx := context.Background()

// 尝试创建消费者组,从 Stream 开头开始
err := rdb.XGroupCreate(ctx, stream, group, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return err
}
return nil
}

// 生产者
func producer() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

for i := 0; i < 20; i++ {
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "orders",
Values: map[string]interface{}{
"order_id": fmt.Sprintf("order-%d", i),
"amount": 100 + i*10,
"status": "pending",
},
}).Result()

if err != nil {
fmt.Printf("发布消息失败: %v\n", err)
continue
}

fmt.Printf("订单已发布,ID: %s\n", id)
time.Sleep(300 * time.Millisecond)
}
}

// 消费者组中的消费者
func consumerGroupWorker(consumerName string) {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

stream := "orders"
group := "order-processors"

// 创建消费者组
if err := createConsumerGroup(rdb, stream, group); err != nil {
fmt.Printf("创建消费者组失败: %v\n", err)
return
}

fmt.Printf("消费者 %s 已启动\n", consumerName)

for {
// 从消费者组读取消息,> 表示只读取未分配给其他消费者的消息
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumerName,
Streams: []string{stream, ">"},
Block: time.Second,
Count: 1,
}).Result()

if err != nil {
if err == redis.Nil {
continue
}
fmt.Printf("消费者 %s 读取消息失败: %v\n", consumerName, err)
time.Sleep(time.Second)
continue
}

for _, stream := range streams {
for _, message := range stream.Messages {
fmt.Printf("消费者 %s 处理消息 ID: %s, 内容: %v\n",
consumerName, message.ID, message.Values)

// 模拟处理时间
time.Sleep(500 * time.Millisecond)

// 确认消息已处理
err := rdb.XAck(ctx, stream, group, message.ID).Err()
if err != nil {
fmt.Printf("确认消息失败: %v\n", err)
} else {
fmt.Printf("消费者 %s 已确认消息 %s\n", consumerName, message.ID)
}
}
}
}
}

// 处理待处理消息(重新处理失败的消息)
func processPendingMessages(consumerName string) {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

stream := "orders"
group := "order-processors"

for {
// 查看待处理消息
pending, err := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: stream,
Group: group,
Start: "-",
End: "+",
Count: 10,
}).Result()

if err != nil {
fmt.Printf("查看待处理消息失败: %v\n", err)
time.Sleep(5 * time.Second)
continue
}

if len(pending) == 0 {
time.Sleep(5 * time.Second)
continue
}

// 处理每个待处理消息
for _, p := range pending {
// 如果消息空闲时间超过 5 秒,重新认领
if p.Idle > 5*time.Second {
messages, err := rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: stream,
Group: group,
Consumer: consumerName,
MinIdle: 5 * time.Second,
Messages: []string{p.ID},
}).Result()

if err != nil {
fmt.Printf("认领消息失败: %v\n", err)
continue
}

for _, msg := range messages {
fmt.Printf("消费者 %s 重新处理消息 ID: %s, 内容: %v\n",
consumerName, msg.ID, msg.Values)

// 处理消息
time.Sleep(500 * time.Millisecond)

// 确认消息
rdb.XAck(ctx, stream, group, msg.ID)
}
}
}

time.Sleep(5 * time.Second)
}
}

func main() {
// 启动多个消费者
go consumerGroupWorker("consumer-1")
go consumerGroupWorker("consumer-2")
go consumerGroupWorker("consumer-3")

// 启动待处理消息处理器
go processPendingMessages("consumer-1")

time.Sleep(1 * time.Second)

// 启动生产者
producer()

// 保持程序运行
time.Sleep(10 * time.Second)
}

限制 Stream 长度示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"context"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

func producerWithMaxLen() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

for i := 0; i < 100; i++ {
// 使用 MAXLEN 限制 Stream 长度,~ 表示近似值(性能更好)
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "limited-stream",
MaxLen: 10, // 最多保留 10 条消息
Approx: true, // 使用近似值,提高性能
Values: map[string]interface{}{
"index": i,
"data": fmt.Sprintf("data-%d", i),
},
}).Result()

if err != nil {
fmt.Printf("发布消息失败: %v\n", err)
continue
}

// 获取当前 Stream 长度
length, _ := rdb.XLen(ctx, "limited-stream").Result()
fmt.Printf("消息 ID: %s, Stream 长度: %d\n", id, length)

time.Sleep(100 * time.Millisecond)
}
}

func main() {
producerWithMaxLen()
}

范围查询示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"context"
"fmt"

"github.com/redis/go-redis/v9"
)

func rangeQuery() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

// 查询所有消息
messages, err := rdb.XRange(ctx, "mystream", "-", "+").Result()
if err != nil {
fmt.Printf("查询失败: %v\n", err)
return
}

fmt.Printf("共找到 %d 条消息:\n", len(messages))
for _, msg := range messages {
fmt.Printf("ID: %s, 内容: %v\n", msg.ID, msg.Values)
}

// 查询指定 ID 范围的消息
if len(messages) >= 2 {
startID := messages[0].ID
endID := messages[len(messages)-1].ID

rangeMessages, err := rdb.XRange(ctx, "mystream", startID, endID).Result()
if err == nil {
fmt.Printf("\n范围查询 (%s 到 %s) 找到 %d 条消息\n",
startID, endID, len(rangeMessages))
}
}

// 倒序查询(最新的消息在前)
revMessages, err := rdb.XRevRangeN(ctx, "mystream", "+", "-", 10).Result()
if err == nil {
fmt.Printf("\n最新的 10 条消息:\n")
for _, msg := range revMessages {
fmt.Printf("ID: %s, 内容: %v\n", msg.ID, msg.Values)
}
}
}

func main() {
rangeQuery()
}

Streams 使用场景

  1. 消息队列:可靠的消息传递,支持消费者组和消息确认
  2. 事件溯源:记录所有事件的历史,支持回放和审计
  3. 日志收集:收集和存储应用日志,支持多消费者处理
  4. 实时数据流处理:处理实时数据流,支持多个处理节点
  5. 任务队列:分布式任务队列,支持任务重试和负载均衡

Streams 最佳实践

  1. 使用消费者组:对于需要可靠处理的场景,使用消费者组而不是直接 XREAD
  2. 及时确认消息:处理完消息后及时 XACK,避免消息堆积
  3. 处理待处理消息:定期检查和处理 XPENDING 中的消息,实现重试机制
  4. 限制 Stream 长度:使用 MAXLEN 限制 Stream 大小,避免内存溢出
  5. 监控 Pending 列表:监控待处理消息数量,及时发现处理瓶颈
  6. 使用近似 MAXLEN:使用 ~ 参数提高性能,在可接受的精度损失下获得更好的性能

Pub/Sub vs Streams

特性 Pub/Sub Streams
消息持久化 ❌ 不持久化 ✅ 持久化
消息历史 ❌ 无历史记录 ✅ 可查看历史消息
消费者组 ❌ 不支持 ✅ 支持消费者组
消息确认 ❌ 不支持 ✅ 支持 ACK 机制
消息重试 ❌ 不支持 ✅ 支持(通过 Pending 列表)
负载均衡 ❌ 不支持 ✅ 支持(消费者组)
性能 ✅ 更高 ⚠️ 相对较低
使用场景 实时通知、简单消息传递 消息队列、日志收集、事件溯源

最佳实践

  1. 选择合适的模式:如果频道数量有限且固定,使用 SUBSCRIBE;如果需要动态匹配多个频道,使用 PSUBSCRIBE

  2. 错误处理:实现完善的错误处理和重连机制,确保订阅者能够从网络故障中恢复。

  3. 消息格式:使用 JSON 等结构化格式存储消息,便于解析和处理。

  4. 监控订阅状态:使用 PUBSUB 命令监控频道的订阅者数量和活跃频道。

  5. 考虑使用 Streams:对于需要可靠性和持久化的场景,考虑使用 Redis Streams 替代 Pub/Sub。

总结

Redis 提供了两种消息传递机制:

  1. Pub/Sub(发布/订阅):提供了一种简单高效的消息传递机制,适用于实时通知、聊天系统、事件广播等场景。但 Pub/Sub 不提供消息持久化和可靠性保证,消息发布时如果订阅者不在线,消息会丢失。

  2. Streams(流):提供了更强大的消息队列功能,支持消息持久化、消费者组、消息确认、消息重试等特性。适用于需要可靠消息传递的场景,如消息队列、日志收集、事件溯源等。

选择建议:

  • 如果只需要简单的实时消息传递,且可以接受消息丢失,使用 Pub/Sub
  • 如果需要可靠的消息传递、消息历史、负载均衡等特性,使用 Streams

参考文献

Pub/Sub

Streams


文章作者: djaigo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 djaigo !
评论
  目录