当前位置: 代码网 > it编程>数据库>Redis > 如何使用redis的stream数据类型做消息队列

如何使用redis的stream数据类型做消息队列

2025年06月26日 Redis 我要评论
在redis5.0之前,如果想使用它作为简单的消息队列,最好的选择就是自身提供的pub/sub模式.它支持简单的发布/订阅模式,发布一个channel绑定一条消息,然后可以有多个消费者监听这个chan

在redis5.0之前,如果想使用它作为简单的消息队列,最好的选择就是自身提供的pub/sub模式.它支持简单的发布/订阅模式,发布一个channel绑定一条消息,然后可以有多个消费者监听这个channel,每个消费者都能收到相同的消息。不支持持久化,不支持查询,不支持分组,不支持分片消费,也没有提供很好的监控手段(有简单的pubsub容器命令,可以看有哪些channel,订阅者数量等)。但是5.0之后,倘若我们人仍选择redis作为简单消息队列,就可以使用新的数据类型stream

stream数据类型介绍

数据类型基础说明

  • 可以理解为一个有时间序列的一组数据集合,每一条新增的数据都是追加到数据集末尾,每一条数据都有自己的唯一id
  • 底层数据结构是基数树
  • 一个stream可以有多个消费者分组group,每一个group也可以有多个消费者consumer,支持分片读取,全部读取,按照id分段读取
  • 随机访问时间复杂度是o(1),向流中添加一个条目的时间为o(1)。 访问任意一项的时间为o(n),其中n是id的长度.

常用命令及详解

  • xadd 向指定的 stream 添加一条新消息。
    xadd key [maxlen [~] count] * field1 value1 [field2 value2 ...]参数说明:
    key:stream 的名称。
    maxlen [~] count:可选,限制 stream 最大长度,超出自动裁剪最老消息。~ 表示近似修剪,性能更优。【实际上使用要注意,超过最大值直接丢弃,也就是“消失了“】
    *:让 redis 自动生成消息id,也可自定义id。
    field value:消息体的键值对。
    用法举例:xadd mystream * name alice age 20

  • xrange 按id范围读取 stream 中的消息
    xrange key start end [count count]参数说明:
    start、end:起止id,- 表示最小id,+ 表示最大id。
    count:可选,限制返回条数。
    用法举例:xrange mystream - + # 读取所有消息

  • xread 从一个或多个 stream 读取新消息,可阻塞等待
    xread [block milliseconds] streams key [key ...] id [id ...]参数说明:
    block:可选,阻塞等待新消息的毫秒数。
    streams:后面跟 stream 名称和起始id。
    用法举例:xread block 5000 streams mystream $ $ 表示只读新消息

  • xgroup 创建、删除、管理 stream 的消费者组。
    xgroup create mystream mygroup 0-0 mkstream常用子命令:

    • 创建组:xgroup create mystream mygroup 0-0 mkstream0-0:从头消费;$:只消费新消息。
      mkstream:stream 不存在时自动创建
    • 删除组:xgroup destroy mystream mygroup
    • 创建消费者、删除消费者。一般不需要,会自动创建
      xgroup createconsumer mystream mygroup consumer-1xgroup createconsumer mystream mygroup consumer-1
  • xreadgroup 以消费者组身份读取消息,实现分布式并发消费
    xreadgroup group group consumer [block milliseconds] streams key [key ...] id [id ...]参数说明:
    group group consumer:指定组名和消费者名。
    id:> 表示只读未分配的新消息,其他id(如0)可用于补偿pending。
    举例:xreadgroup group mygroup consumer-1 block 5000 streams mystream >

  • xpending 查看某个组下所有未ack的消息(即已分配但未确认)注意这里不是消息的快照,它只是存储消息的id列表,并不会复制一份消息内容xpending key group [start end count [consumer]]举例:xpending mystream mygroup - + 10
    xpending mystream mygroup - + 10 consumer-1

  • xack 用于确认消息已被消费,也就是从pending状态pel中移除
    举例:xack mystream mygroup 1680000000000-0

  • xclaim/xautoclaim 将长时间未ack的pending消息转移到其他消费者/实现自动补偿。
    举例:xclaim mystream mygroup consumer-2 60000 1680000000000-0xautoclaim mystream mygroup consumer-2 60000 0-0 count 10

  • xtrim 限制流的最大长度,自动删除最老的消息。无论是否被ack的消息,都会被裁减。
    语法:xtrim key maxlen [~] count
    举例:xtrim mystream maxlen ~ 1000

  • xdel 从stream中删除指定id的消息,可以一次删除多个,用空格隔开即可
    xdel mystream 1680000000000-0

实际使用场景

可用作消息队列

  • 当需要一个轻量级的、安全性要求比较低、可靠性不要求那么高的一个消息队列时,使用stream就很合适,性能也非常不错,单机能支持每秒几十万的写入
  • 典型场景:订单异步处理、短信/邮件通知、日志收集、任务分发等

可以作为事件总线

  • 作为事件总线,支撑微服务间的事件发布与订阅,作为事件源(例如,跟踪用户操作、点击等)。
  • 例如:用户注册事件、支付完成事件等,多个服务可并发消费

延迟队列/死信队列

  • 利用 stream 的 pending/ack/xclaim 机制实现可靠的延迟消息、死信消息补偿。

实时数据流处理

  • iot、监控、风控等场景下,设备/传感器数据实时写入 stream,后端实时消费分析。
  • 支持高并发写入和多消费者并发处理

重要说明

关于持久化和消息删除

  • 消息是默认就持久化的,并且并不提供设置过期时间,那么如果在消息量大且请求量大的情况下,会占用很多内存
  • 如果在新增消息的时候使用maxlen选项限定了stream的长度,那么一定要考虑使用多个consumer,而且要提供一定的处理机制在某些consumer不可用的时候,将消息xclaim到可用的消费者。避免超过限定长度后,丢失消息。
  • 不推荐每次消费完成后使用xdel去删除,而是采用xtrim收缩,结合xinfo、xlen等命令定期检测stream的长度,然后根据实际情况设置合理的收缩长度,定期的清理不再使用的消息。因为即使使用xdel取删除消息,在当前的实现中,直到宏节点完全为空时才真正回收内存

读取的阻塞和非阻塞

  • xrange 、xread 或 xreadgroup ,没有block选项时,像任何其他redis命令一样同步调用,此时他们就是同步命令;如果加上block选项就时非阻塞的,等待指定的毫秒直到有可以消费的消息并立即返回

插入的性能

  • xadd 非常快,如果使用流水线,在普通机器中每秒可以轻松插入50万到100万项
  • 以下是官网提供的延迟测试结果:【在这里,我们每次迭代最多处理10k条消息,这意味着 xreadgroup 的 count 参数被设置为10000。这增加了大量的延迟,但为了让缓慢的消费者能够跟上消息流,这是必需的。因此,你可以预期真实世界的延迟要小得多】
results obtained: 结果:
		processed between 0 and 1 ms -> 74.11%
		processed between 1 and 2 ms -> 25.80%
		processed between 2 and 3 ms -> 0.06%
		processed between 3 and 4 ms -> 0.01%
		processed between 4 and 5 ms -> 0.02%
		因此,99.9%的请求的延迟<= 2毫秒,异常值仍然非常接近平均值。
  • 另外需要注意的是,从redis 6.2.0版本开始,才增加了 idle 选项和独占范围间隔,虽然5.0就引入了stream数据类型

消费者组

  • 何时不需要消费者组:如果你有一个数据流和多个客户端,而且你希望所有客户端都能收到所有信息,那么你就不需要消费者组。
  • 如果你有一个数据流和多个客户端,而且你希望在客户端之间对数据流进行分区或分片,以便每个客户端都能获得到达数据流的消息的子集,那么你就需要一个消费者组。
  • 当使用 xreadgroup 读取时,服务器将记录哪些消息给到了哪些消费者:消息将存储在使用者组内的 pending entries list (pel) 中,该列表是已传递但尚未确认的消息 id 列表。
  • 当实际场景是:可靠性不是必需的,并且偶尔的消息丢失是可以接受的情况下,可以使用 noack 子命令来避免将消息添加到 pel。这相当于在读取消息时确认消息(自动ack)。
  • 使用 xreadgroup 时,在 streams 选项中指定的 id 可以是以下两种之一:

    特殊的 > id,表示消费者只想接收从未发送给其他消费者的信息。它的意思是,给我新邮件。

    任何其他 id,即 0 或任何其他有效 id 或不完整 id(仅毫秒时间部分),都将导致返回发送命令的用户的待处理条目,且 id 大于所提供的 id。因此,基本上如果 id 不大于,那么命令将只允许客户访问其待处理条目:已向其发送但尚未确认的信息。请注意,在这种情况下,block 和 noack 都会被忽略。

属于pel中的消息可以删除吗

pending状态的消息是可以被删除的,redis并没有设计未确认的消息不允许删除。如果采用xdel删除消息后,pending列表将仍然保留待消费消息的id,但是消息内容没有了。因此,在读取此类pel条目时,redis会返回一个空值。

一个stream的一个group多个consumer时如何消费的

1. 分区/竞争消费(work queue 模式)

  • 每条消息只会被 group 下的一个消费者消费,不会被所有消费者都消费。
  • redis 会将新消息分配给 group 内“空闲”的消费者,实现消息的负载均衡(轮询或空闲优先,具体是由实现的客户端决定)。
  • 多个消费者并发时,消息会被“分摊”到各个消费者,每个消息只会被其中一个消费。
  • 消息被转xclaim到另一个消费者时会增加投递次数,并发时投递次数、时间戳都会变化,因此也只有一个消费者成功获取。xpending命令就可以看到每个消息被投递的次数

2. pending 机制

  • 消费者用 xreadgroup 拉取消息后,消息会进入该消费者的 pending(未确认)列表,直到被 xack。
  • 如果某个消费者挂掉,pending 里的消息可以被其他消费者用 xclaim/xautoclaim 方式“抢救”回来,保证消息最终被消费。

3. 分布式环境下的存储

  • stream的增加数据和其他数据类型一样,都是需要一个唯一的key,然后给key绑定指定数据类型的一个或者多个值
  • 那也就是说,即使在分布式存储环境下,它和其他的key一样,相同的key的数据一定存在同一个分片上(因为redis的分片机制就是按照key来实现的)
  • 实际使用时key的设置就要相对分散,否则数据会倾斜到某些节点上

x. 如果要“广播”效果(每个消费者都收到同一条消息),需要每个消费者用不同的 group。或者都广播了,就使用pub/sub吧,,~~

观测流

  • redis流和消费者组有不同的方式来观察正在发生的事情,比如前面说的xpending ,它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和交付数量
  • xinfo:这个命令使用子命令来显示流及其消费者组状态的不同信息。例如,xinfo流报告有关流本身的信息。可以用于stream、group、consumers
  • 实际项目中可结合其他命令,直观的展示流的各种信息,比如有多少个分组、有哪些分组、有哪些消费者、消费者状态、消费进度、总条目数据等。有了这些信息就可以对消息的可靠性进行分析,还能及时发现资源占用情况,结合定时任务等作出具体性能调整。

更加详细stream的细节介绍,可以参考官网:https://redis.io/docs/latest/develop/data-types/streams

稍后我将具体介绍如何在代码中使用stream来作为消息队列。

到此这篇关于使用redis的stream数据类型做消息队列的文章就介绍到这了,更多相关redis消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com