Redis 的发布/订阅(Pub/Sub)是一种消息通信模式,允许消息的发送者(发布者)将消息发送到频道,而不需要知道接收者(订阅者)的具体信息。订阅者可以订阅一个或多个频道,并接收发布到这些频道的消息。
基本概念
Redis Pub/Sub 是一种”发布-订阅”消息模式,包含以下核心概念:
- 发布者(Publisher):向频道发送消息的客户端
- 订阅者(Subscriber):订阅频道并接收消息的客户端
- 频道(Channel):消息传递的通道
核心命令
1. PUBLISH - 发布消息
向指定频道发布消息,返回接收到该消息的订阅者数量。
1 | PUBLISH channel message |
示例:
1 | PUBLISH news "Hello World" |
2. SUBSCRIBE - 订阅频道
订阅一个或多个频道,只接收这些确切频道上的消息。
1 | SUBSCRIBE channel [channel ...] |
示例:
1 | SUBSCRIBE news sports |
3. UNSUBSCRIBE - 取消订阅
取消订阅指定频道,如果不指定频道则取消所有订阅。
1 | UNSUBSCRIBE [channel ...] |
4. PSUBSCRIBE - 模式订阅
使用模式(glob 风格)订阅所有匹配的频道。
1 | PSUBSCRIBE pattern [pattern ...] |
支持的匹配模式:
*:匹配任意字符?:匹配单个字符[ae]:匹配指定字符中的一个\*:转义字符
示例:
1 | PSUBSCRIBE news.* |
5. PUNSUBSCRIBE - 取消模式订阅
取消订阅指定的模式。
1 | PUNSUBSCRIBE [pattern ...] |
6. PUBSUB - 查看订阅信息
查看 Pub/Sub 系统的状态信息。
1 | # 查看指定频道的订阅者数量 |
使用场景
1. 实时聊天系统
用户加入聊天室,订阅对应频道,发送消息时使用 PUBLISH。
1 | # 用户订阅聊天室 |
2. 实时通知系统
系统有多个事件类型,用户可以订阅自己关心的事件。
1 | # 订阅订单相关事件 |
3. 日志收集与监控
多个服务按服务名或模块命名频道,统一处理所有服务的日志。
1 | # 订阅所有服务的日志 |
4. 分布式任务广播
主节点向从节点广播命令或任务。
1 | # 节点订阅命令频道 |
5. 系统事件通知
系统状态变化时通知相关组件。
1 | # 订阅系统事件 |
代码示例
Go 示例
使用 go-redis 库实现订阅发布:
1 | package main |
注意事项与限制
1. 消息不持久化
Pub/Sub 消息不会持久化到磁盘。如果订阅者在消息发布时不在线,它将不会收到那条消息。如果需要持久化、历史记录或可靠性保证,应该考虑使用 Redis Streams。
2. 交付语义
Redis Pub/Sub 提供的是”最多一次”(at-most-once)交付语义:
- 消息发布后不会重试
- 不保证被所有订阅者接收
- 如果订阅者处理消息失败,消息会丢失
3. 订阅状态限制
客户端在使用 SUBSCRIBE 或 PSUBSCRIBE 后会进入订阅状态,此时只能执行以下命令:
SUBSCRIBE、PSUBSCRIBEUNSUBSCRIBE、PUNSUBSCRIBEPING、QUITRESET(RESP3 协议)
其他命令会被拒绝执行。
4. 频道与数据库无关
Pub/Sub 与 Redis 的数据库编号(DB)无关。连接在不同 DB 上的订阅者也能接收到消息,因为 Pub/Sub 是全局的。
5. 性能考虑
PSUBSCRIBE会对所有匹配模式的频道进行检查- 模式越宽泛(如
*),匹配的频道越多,开销越大 - 生产环境中要避免使用过于宽泛的模式,以免匹配过多无关频道
6. 连接断开处理
订阅者需要处理网络断开的情况:
- 实现自动重连机制
- 在重连后重新订阅频道
- 考虑使用 Redis Streams 作为更可靠的替代方案
Redis Streams
Redis Streams 是 Redis 5.0 引入的数据结构,提供了更强大的消息队列功能。与 Pub/Sub 相比,Streams 支持消息持久化、消费者组、消息确认等特性,更适合需要可靠消息传递的场景。
Streams 基本概念
- Stream:一个只追加的日志数据结构,类似于日志文件
- Entry(条目):Stream 中的一条消息,包含唯一的 ID 和键值对数据
- Consumer Group(消费者组):多个消费者协同处理消息的机制
- Consumer(消费者):消费者组中的单个消费者实例
- Pending Entry(待处理条目):已读取但未确认的消息
Streams 核心命令
1. XADD - 添加消息
向 Stream 添加新消息,返回消息 ID。
1 | XADD stream [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...] |
示例:
1 | # 添加消息,* 表示自动生成 ID |
2. XREAD - 读取消息
从 Stream 读取消息,支持阻塞模式。
1 | XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] |
示例:
1 | # 读取所有新消息 |
3. XGROUP - 消费者组管理
创建、管理消费者组。
1 | # 创建消费者组 |
示例:
1 | # 从 Stream 开头创建消费者组 |
4. XREADGROUP - 消费者组读取
消费者组中的消费者读取消息。
1 | XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...] |
示例:
1 | # 消费者读取未处理的消息 |
5. XACK - 确认消息
确认消息已被处理,从 Pending 列表中移除。
1 | XACK stream group id [id ...] |
示例:
1 | XACK mystream mygroup 1609459200000-0 |
6. XPENDING - 查看待处理消息
查看消费者组中的待处理消息。
1 | XPENDING stream group [start] [end] [count] [consumer] |
示例:
1 | # 查看所有待处理消息 |
7. XCLAIM - 认领消息
将待处理消息重新分配给其他消费者。
1 | XCLAIM stream group consumer min-idle-time id [id ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID] |
8. XRANGE / XREVRANGE - 范围查询
按 ID 范围查询消息。
1 | # 正序查询 |
示例:
1 | # 查询所有消息 |
9. XDEL - 删除消息
删除 Stream 中的消息。
1 | XDEL stream id [id ...] |
10. XLEN - 获取长度
获取 Stream 中的消息数量。
1 | XLEN stream |
Streams Go 代码示例
基础示例:生产者与消费者
1 | package main |
消费者组示例
1 | package main |
限制 Stream 长度示例
1 | package main |
范围查询示例
1 | package main |
Streams 使用场景
- 消息队列:可靠的消息传递,支持消费者组和消息确认
- 事件溯源:记录所有事件的历史,支持回放和审计
- 日志收集:收集和存储应用日志,支持多消费者处理
- 实时数据流处理:处理实时数据流,支持多个处理节点
- 任务队列:分布式任务队列,支持任务重试和负载均衡
Streams 最佳实践
- 使用消费者组:对于需要可靠处理的场景,使用消费者组而不是直接 XREAD
- 及时确认消息:处理完消息后及时 XACK,避免消息堆积
- 处理待处理消息:定期检查和处理 XPENDING 中的消息,实现重试机制
- 限制 Stream 长度:使用 MAXLEN 限制 Stream 大小,避免内存溢出
- 监控 Pending 列表:监控待处理消息数量,及时发现处理瓶颈
- 使用近似 MAXLEN:使用
~参数提高性能,在可接受的精度损失下获得更好的性能
Pub/Sub vs Streams
| 特性 | Pub/Sub | Streams |
|---|---|---|
| 消息持久化 | ❌ 不持久化 | ✅ 持久化 |
| 消息历史 | ❌ 无历史记录 | ✅ 可查看历史消息 |
| 消费者组 | ❌ 不支持 | ✅ 支持消费者组 |
| 消息确认 | ❌ 不支持 | ✅ 支持 ACK 机制 |
| 消息重试 | ❌ 不支持 | ✅ 支持(通过 Pending 列表) |
| 负载均衡 | ❌ 不支持 | ✅ 支持(消费者组) |
| 性能 | ✅ 更高 | ⚠️ 相对较低 |
| 使用场景 | 实时通知、简单消息传递 | 消息队列、日志收集、事件溯源 |
最佳实践
选择合适的模式:如果频道数量有限且固定,使用
SUBSCRIBE;如果需要动态匹配多个频道,使用PSUBSCRIBE。错误处理:实现完善的错误处理和重连机制,确保订阅者能够从网络故障中恢复。
消息格式:使用 JSON 等结构化格式存储消息,便于解析和处理。
监控订阅状态:使用
PUBSUB命令监控频道的订阅者数量和活跃频道。考虑使用 Streams:对于需要可靠性和持久化的场景,考虑使用 Redis Streams 替代 Pub/Sub。
总结
Redis 提供了两种消息传递机制:
Pub/Sub(发布/订阅):提供了一种简单高效的消息传递机制,适用于实时通知、聊天系统、事件广播等场景。但 Pub/Sub 不提供消息持久化和可靠性保证,消息发布时如果订阅者不在线,消息会丢失。
Streams(流):提供了更强大的消息队列功能,支持消息持久化、消费者组、消息确认、消息重试等特性。适用于需要可靠消息传递的场景,如消息队列、日志收集、事件溯源等。
选择建议:
- 如果只需要简单的实时消息传递,且可以接受消息丢失,使用 Pub/Sub
- 如果需要可靠的消息传递、消息历史、负载均衡等特性,使用 Streams