登录
首页 >  数据库 >  Redis

Redis Stream 消息队列实战:消费组、ACK 和失败重投怎么配

来源:17golang原创

时间:2026-06-13 12:23:41 187浏览 收藏

很多业务一开始只需要一个轻量消息队列:下单后异步发券、支付后同步库存、用户上传后生成缩略图。如果系统已经在用 Redis,Redis Stream 是一个很顺手的选择。它比 List 更适合做“可追踪的消息流”,因为每条消息都有 ID,消费组会记录每个消费者的处理进度,还能把未确认消息留在待确认列表里。

本文用“订单异步处理”做例子,讲清楚 Redis Stream 的三个关键点:消息如何进入 Stream,消费组如何分配任务,Worker 成功后如何 ACK,失败或超时后如何重投。目标不是把每个命令都背下来,而是把生产环境里最容易踩坑的消费闭环搭起来。

摘要

Redis Stream 适合中小规模异步任务、削峰填谷和轻量事件流。稳定使用它的关键是:写入消息要带业务 ID,消费者用消费组读取,处理成功必须 ACK,长时间未确认的消息要定期检查并重投。

适合人群

适合已经了解 Redis 基础数据结构,想用 Redis 做轻量消息队列的后端开发者。示例命令可以直接在本地 Redis 里验证,业务代码接入时再根据语言客户端封装。

目录

  • 为什么选择 Redis Stream
  • 正常消费链路:从 XADD 到 XACK
  • 创建 Stream 和消费组
  • 失败消息:PEL、XPENDING 和重投
  • 生产环境常见坑
  • 总结

为什么选择 Redis Stream

Redis 里常见的队列写法有 List、Pub/Sub 和 Stream。List 简单,但缺少消费组和待确认追踪;Pub/Sub 更像实时广播,订阅方离线时消息容易错过;Stream 则在轻量队列和事件流之间做了一个平衡。

Stream 的几个优势很适合业务异步任务:

  • 每条消息都有递增 ID,方便追踪和排查。
  • 消费组可以让多个 Worker 分摊任务。
  • 消息处理成功后通过 ACK 确认,没确认的会留在 PEL。
  • 失败或超时消息可以被其他消费者认领并重试。

它不是 Kafka 的替代品。如果你需要很大的吞吐、复杂分区、长期日志保存和跨机房复制,应该考虑专门的消息系统。但对于很多单体或中小微服务,Redis Stream 已经足够实用。

正常消费链路:从 XADD 到 XACK

一条正常消息的生命周期是:生产者写入 Stream,消费组把消息分给某个 Worker,Worker 完成业务处理后发送 ACK,Redis 才会把它从待确认列表中移除。

Redis Stream 从生产消息到消费组分配和 ACK 完成的正常消费链路图

先写入一条订单任务:

redis-cli XADD order_stream * order_id 10086 action pay_success amount 199

这条命令会返回一个消息 ID,例如 1781320000000-0。业务上建议把 order_id 这类唯一字段放进消息体,方便消费者做幂等处理。

消费者读取消息时,不要直接用普通读取方式,而是使用消费组:

redis-cli XREADGROUP GROUP order_group worker_a COUNT 10 BLOCK 2000 STREAMS order_stream >

这里最后的 > 表示只读取这个消费组里尚未投递给其他消费者的新消息。Worker 拿到消息后执行业务逻辑,成功后确认:

redis-cli XACK order_stream order_group 1781320000000-0

ACK 是这条链路的终点。只要成功处理但忘记 ACK,消息就会一直留在待确认列表里,后面排查时会看到它像“半路卡住”一样。

创建 Stream 和消费组

第一次使用某个 Stream 时,需要创建消费组。可以用 MKSTREAM 在 Stream 不存在时一并创建:

redis-cli XGROUP CREATE order_stream order_group 0 MKSTREAM

这里的 0 表示从 Stream 里的第一条消息开始消费。如果你只想消费创建消费组之后的新消息,可以把起始 ID 写成 $

redis-cli XGROUP CREATE order_stream order_group $ MKSTREAM

两种选择没有绝对好坏,关键看业务语义:

  • 补偿历史任务:从 0 开始。
  • 只处理新事件:从 $ 开始。
  • 新系统上线前已有积压:先确认是否需要补历史,再决定起点。

失败消息:PEL、XPENDING 和重投

真实业务里,Worker 可能会处理失败、超时、进程退出,或者在 ACK 前崩掉。Redis Stream 不会把这类消息直接丢掉,而是把已投递但未 ACK 的消息放进 PEL,也就是待确认列表。

Redis Stream 消息处理失败进入 PEL 后通过空闲检查和重新认领完成重投的流程图

查看消费组里的待确认情况:

redis-cli XPENDING order_stream order_group

如果想看更细的范围和消费者信息,可以加上 ID 范围和数量:

redis-cli XPENDING order_stream order_group - + 10

当某条消息长时间未确认,可以让另一个消费者认领并重试。Redis 6.2 之后更推荐使用 XAUTOCLAIM,它会按空闲时间扫描待确认消息:

redis-cli XAUTOCLAIM order_stream order_group worker_b 60000 0-0 COUNT 10

上面命令表示:把空闲超过 60 秒的待确认消息转交给 worker_b。Worker 重新处理成功后,仍然要调用 XACK 清理待确认状态。

消费者处理建议

业务代码里建议把消费逻辑拆成四步:拉取消息、幂等判断、处理业务、确认消息。

拉取消息
  -> 根据 order_id 做幂等判断
  -> 处理支付后任务
  -> 成功后 XACK
  -> 失败则记录错误,等待重投任务处理

这里最重要的是幂等。重投意味着同一条消息可能被处理多次,业务侧必须能识别“已经处理过的订单”。常见做法是用订单表状态、唯一流水号或 Redis SETNX 记录处理标记。

生产环境常见坑

第一,忘记 ACK。 处理成功但没 ACK,会导致 PEL 越积越多。上线后要监控 XPENDING 总数,超过阈值就报警。

第二,消息没有业务唯一键。 如果只依赖 Stream ID,业务排查会很痛苦。建议消息体里带上订单号、用户 ID、任务类型等关键字段。

第三,重投间隔太短。 业务正在慢慢处理,另一个 Worker 就把消息抢走,会造成重复处理压力。空闲时间阈值要结合接口耗时、数据库耗时和重试成本设置。

第四,Stream 不做修剪。 如果消息长期不清理,Stream 会持续增长。可以在写入时使用近似修剪,保留最近一段消息用于排查:

redis-cli XADD order_stream MAXLEN ~ 100000 * order_id 10087 action pay_success

第五,把 Stream 当成万能队列。 Redis Stream 很轻巧,但也要考虑 Redis 内存、持久化策略和故障恢复。如果消息不能丢、延迟要求高、积压规模大,需要更完整的消息系统设计。

总结

Redis Stream 的稳定使用重点在消费闭环:XADD 写入消息,XREADGROUP 分配任务,业务处理成功后 XACK,异常消息通过 XPENDING 观察,再用 XAUTOCLAIM 做重投。只要幂等、ACK、待确认监控和消息修剪这几件事做好,它就能承担很多中小型业务里的异步队列需求。

声明:本文转载于:17golang原创 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>