当前位置: 代码网 > 服务器>服务器>Linux > 深入理解Apache RocketMQ 中Message 消息的核心概念

深入理解Apache RocketMQ 中Message 消息的核心概念

2025年08月01日 Linux 我要评论
好的,我们来深入理解一下 apache rocketmq 中 message (消息) 这个核心概念。这份文档详细阐述了消息的定义、在模型中的位置、内部属性、约束和使用建议。推荐阅读:rocketmq

好的,我们来深入理解一下 apache rocketmq 中 message (消息) 这个核心概念。这份文档详细阐述了消息的定义、在模型中的位置、内部属性、约束和使用建议。

推荐阅读:rocketmq 消息message的结构和使用方式详解

你可以将 message 看作是 rocketmq 系统中数据传输和处理的最小原子单位。它承载了业务数据,并附带了丰富的元信息,是生产者、broker 和消费者之间通信的载体。

1.message的本质定义

  • 最小传输单元 (smallest unit of data transmission)
    • message 是 rocketmq 中数据传输的基本单元。生产者将业务数据(负载)和扩展属性封装成 message,发送给 broker;broker 再根据订阅关系将 message 传递给消费者。
  • 核心特性 (characteristics)
    • 不可变性 (immutability)
      • 消息一旦生成,其内容(特别是系统属性和负载)在传输和存储过程中不会改变。它被视为一个“已发生的事件”。
      • 消费者获取到的消息是只读的 (read-only)。在 5.x 版本中,这是强约束;在 3.x/4.x 版本中虽无强约束,但最佳实践也建议不要修改。
      • 最佳实践:如果需要基于收到的消息发送新消息,应该创建一个新消息(例如 messagebuilder.buildfrom(m)),而不是直接修改原消息。
    • 持久性 (persistence)
      • 默认情况下,rocketmq 会将接收到的消息持久化存储在 broker 的存储文件中。这是保证消息不丢失、支持消息追溯和系统故障恢复的基础。

2.message在模型中的位置

  • 生命周期流程
    1. 生产 (produced):由生产者 (producer) 创建并初始化。
    2. 发送 (sent):生产者将消息发送到 apache rocketmq broker
    3. 存储 (stored):broker 接收到消息后,将其按接收顺序存储在特定 topic 的某个 queue 中。
    4. 消费 (consumed)消费者 (consumer) 根据订阅关系,从 broker 的相应 queue 中拉取 (pull) 消息进行消费。

3.message的核心内部属性

这些属性分为系统保留属性 (system retention attributes)可选属性 (optional attributes),以及负载 (load)

系统保留属性 (由系统或生产者设置)

  • topic 名称 (topic name)
    • 作用:标识该消息属于哪个逻辑主题。在集群内必须唯一。
    • 来源:由生产者 sdk 设置。
  • 消息类型 (message type)

    • 作用:定义消息的语义和处理方式。rocketmq 支持多种类型:
      • normal:普通消息,无特殊语义。
      • fifo:顺序消息,保证同一消息组 (message group) 内的消息按发送顺序被消费。实现依赖于 queue 的有序性。
      • delay:延迟消息,可以指定延迟时间(最大40天),延迟时间到后才对消费者可见。
      • transaction:事务消息,用于实现分布式事务,确保本地数据库操作和消息发送的最终一致性。
  • 消息队列 (message queue)

    • 作用:指明该消息最终被存储在哪个具体的 queue 中(属于哪个 topic 的哪个 queue)。
    • 来源:由 broker 在消息到达后,根据路由策略(如轮询、哈希等)指定并填充
  • 消息 offset (message offset)

    • 作用:标识该消息在其所属 queue 内部的物理存储位置(偏移量)。是 broker 内部管理消息顺序和消费者消费进度的关键。
    • 来源:由 broker 指定并填充。从 0 开始递增。
  • 消息 id (message id)

    • 作用:消息的全局唯一标识符。在集群内绝对唯一,用于消息追踪、排查问题。
    • 来源:由生产者客户端自动生成(通常是 32 位的数字和大写字母组成的字符串)。

可选属性 (由生产者设置)

  • (可选) 消息 keys (message keys)
    • 作用:为消息设置一个或多个索引键。主要用于消息查询(通过 message idmessage key 在控制台或通过 api 查找特定消息)和去重(结合业务逻辑)。
    • 来源:由生产者客户端定义。
  • (可选) 消息 tag (message tag)
    • 作用:消息的标签,用于消费者进行消息过滤。消费者可以订阅特定的 tag,从而只接收带有该 tag 的消息,实现简单的消息分类。
    • 来源:由生产者客户端定义。
    • 约束:每个消息只能设置一个 tag
  • (可选) 定时时间 (scheduled time)
    • 作用:配合 delay 消息类型使用,指定消息延迟的具体时间戳(毫秒级),而不是延迟时长。
    • 来源:由消息生产者定义。
    • 约束:最大延迟时间 40 天

时间戳属性

  • 消息发送时间 (message sending time)
    • 作用:记录消息在生产者客户端本地被发送出去的时间戳(毫秒级)。
    • 来源:由生产者客户端填充
    • 注意:这是客户端时间,可能与 broker 时间有偏差。
  • 消息存储时间 (message store timestamp)
    • 作用:记录消息被broker 成功写入存储(落盘)的时间戳(毫秒级)。对于延迟消息和事务消息,消费者感知到的“有效时间”通常基于此时间。
    • 来源:由broker 填充
    • 注意:这是 broker 时间,是消息在服务端的“出生”时间。

重试与自定义

  • 重试次数 (retry times)
    • 作用:记录该消息被 broker 重新投递给消费者的次数。每次消费失败触发重试,次数加一。第一次消费时为 0。
    • 来源:由 broker 标记。消费者可以获取此信息以进行幂等处理或特殊逻辑。
  • 自定义属性 (custom attributes)
    • 作用:生产者可以添加任意的 key-value (字符串类型) 对作为扩展信息,供业务逻辑使用。
    • 来源:由生产者根据需要指定。
  • 消息负载 (message load)
    • 作用:消息的实际业务数据内容,即有效载荷 (payload)。
    • 来源:由生产者序列化成二进制字节流后设置。
    • 约束:大小不能超过系统限制。

4.message的行为约束

  • 大小限制 (size limit)
    • 核心约束:单条消息的大小不能超过上限,否则发送会失败。
    • 默认限制4 mb。这是非常重要的参数,直接影响网络传输、存储和处理性能。

5.message的使用建议与最佳实践

  • 避免单条消息过大 (overloaded transmission)
    • 原因:rocketmq 是事件驱动的中间件。过大的消息会:
      • 加重网络传输负担,增加延迟。
      • 影响错误重试:重试大消息成本高。
      • 影响流控 (throttling):流控粒度可能不够精细。
    • 建议
      1. 严格控制单条消息的数据量,使其尽可能小。
      2. 如果业务上必须传输大量数据,强烈建议
        • 拆分消息:将大数据按固定大小拆分成多条小消息。
        • 使用外部存储:将实际数据(如文件、图片)存放到对象存储(如 oss)、文件系统或数据库中,然后在消息的 loadcustom attributes只传递数据的访问链接 (url) 或 id
  • 遵守消息不可变性原则 (immutability)
    • 正确做法:收到消息后,如果需要转发或基于它生成新消息,使用 messagebuilder.buildfrom(m) 这样的方法创建一个新消息实例,然后修改新实例的属性(如 topic, tag, load 等)再发送。
    • 错误做法:直接调用 m.update() 修改收到的消息 m 的内容,然后发送。这违反了不可变性原则,可能导致不可预知的行为或在 5.x 版本中被拒绝。

总结与核心理解

  1. message 是原子单元:它封装了业务数据和元信息,是 rocketmq 传输的最小单位。
  2. message 是不可变的:内容一旦产生,在传递过程中不应被修改。最佳实践是“读取-创建-发送”新消息。
  3. message 是持久化的:默认落盘存储,保证可靠性。
  4. message 拥有丰富的属性:
    • topic/queue/offset 定义了其在系统中的位置和顺序。
    • message id 提供全局唯一标识。
    • keys/tag 支持查询和过滤。
    • message type 定义了语义(普通、顺序、延迟、事务)。
    • sending time/store timestamp 记录了关键时间点。
    • retry times 协助处理消费失败。
    • custom attributes 提供扩展能力。
    • load 承载实际业务数据。
  5. message 有严格的大小限制:默认 4mb。避免大消息是关键设计原则,应通过拆分或外链方式处理大数据。
  6. 最佳实践
    • 合理使用 tag 进行消息过滤。
    • 利用 keys 进行消息追踪。
    • 遵守不可变性,通过 buildfrom 创建新消息。
    • 绝对不要发送超过 4mb 的消息,采用拆分或外链方案。

简而言之,message 是 rocketmq 的“信封”和“信件”本身。理解其结构、属性、约束和最佳实践,对于设计高效、可靠、可维护的消息系统至关重要。记住:小消息、不可变、善用属性、规避大负载

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com