好的,我们来深入理解一下 apache rocketmq 中 message (消息) 这个核心概念。这份文档详细阐述了消息的定义、在模型中的位置、内部属性、约束和使用建议。
推荐阅读:rocketmq 消息message的结构和使用方式详解
你可以将 message
看作是 rocketmq 系统中数据传输和处理的最小原子单位。它承载了业务数据,并附带了丰富的元信息,是生产者、broker 和消费者之间通信的载体。
1.message的本质定义
- 最小传输单元 (smallest unit of data transmission):
message
是 rocketmq 中数据传输的基本单元。生产者将业务数据(负载)和扩展属性封装成message
,发送给 broker;broker 再根据订阅关系将message
传递给消费者。
- 核心特性 (characteristics):
- 不可变性 (immutability):
- 消息一旦生成,其内容(特别是系统属性和负载)在传输和存储过程中不会改变。它被视为一个“已发生的事件”。
- 消费者获取到的消息是只读的 (read-only)。在 5.x 版本中,这是强约束;在 3.x/4.x 版本中虽无强约束,但最佳实践也建议不要修改。
- 最佳实践:如果需要基于收到的消息发送新消息,应该创建一个新消息(例如
messagebuilder.buildfrom(m)
),而不是直接修改原消息。
- 持久性 (persistence):
- 默认情况下,rocketmq 会将接收到的消息持久化存储在 broker 的存储文件中。这是保证消息不丢失、支持消息追溯和系统故障恢复的基础。
- 不可变性 (immutability):
2.message在模型中的位置
- 生命周期流程:
- 生产 (produced):由生产者 (producer) 创建并初始化。
- 发送 (sent):生产者将消息发送到 apache rocketmq broker。
- 存储 (stored):broker 接收到消息后,将其按接收顺序存储在特定 topic 的某个 queue 中。
- 消费 (consumed):消费者 (consumer) 根据订阅关系,从 broker 的相应 queue 中拉取 (pull) 消息进行消费。
3.message的核心内部属性
这些属性分为系统保留属性 (system retention attributes) 和可选属性 (optional attributes),以及负载 (load)。
系统保留属性 (由系统或生产者设置)
- topic 名称 (topic name):
- 作用:标识该消息属于哪个逻辑主题。在集群内必须唯一。
- 来源:由生产者 sdk 设置。
消息类型 (message type):
- 作用:定义消息的语义和处理方式。rocketmq 支持多种类型:
normal
:普通消息,无特殊语义。fifo
:顺序消息,保证同一消息组 (message group) 内的消息按发送顺序被消费。实现依赖于 queue 的有序性。delay
:延迟消息,可以指定延迟时间(最大40天),延迟时间到后才对消费者可见。transaction
:事务消息,用于实现分布式事务,确保本地数据库操作和消息发送的最终一致性。
- 作用:定义消息的语义和处理方式。rocketmq 支持多种类型:
消息队列 (message queue):
- 作用:指明该消息最终被存储在哪个具体的
queue
中(属于哪个 topic 的哪个 queue)。 - 来源:由 broker 在消息到达后,根据路由策略(如轮询、哈希等)指定并填充。
- 作用:指明该消息最终被存储在哪个具体的
消息 offset (message offset):
- 作用:标识该消息在其所属 queue 内部的物理存储位置(偏移量)。是 broker 内部管理消息顺序和消费者消费进度的关键。
- 来源:由 broker 指定并填充。从 0 开始递增。
消息 id (message id):
- 作用:消息的全局唯一标识符。在集群内绝对唯一,用于消息追踪、排查问题。
- 来源:由生产者客户端自动生成(通常是 32 位的数字和大写字母组成的字符串)。
可选属性 (由生产者设置)
- (可选) 消息 keys (message keys):
- 作用:为消息设置一个或多个索引键。主要用于消息查询(通过
message id
或message key
在控制台或通过 api 查找特定消息)和去重(结合业务逻辑)。 - 来源:由生产者客户端定义。
- 作用:为消息设置一个或多个索引键。主要用于消息查询(通过
- (可选) 消息 tag (message tag):
- 作用:消息的标签,用于消费者进行消息过滤。消费者可以订阅特定的 tag,从而只接收带有该 tag 的消息,实现简单的消息分类。
- 来源:由生产者客户端定义。
- 约束:每个消息只能设置一个 tag。
- (可选) 定时时间 (scheduled time):
- 作用:配合
delay
消息类型使用,指定消息延迟的具体时间戳(毫秒级),而不是延迟时长。 - 来源:由消息生产者定义。
- 约束:最大延迟时间 40 天。
- 作用:配合
时间戳属性
- 消息发送时间 (message sending time):
- 作用:记录消息在生产者客户端本地被发送出去的时间戳(毫秒级)。
- 来源:由生产者客户端填充。
- 注意:这是客户端时间,可能与 broker 时间有偏差。
- 消息存储时间 (message store timestamp):
- 作用:记录消息被broker 成功写入存储(落盘)的时间戳(毫秒级)。对于延迟消息和事务消息,消费者感知到的“有效时间”通常基于此时间。
- 来源:由broker 填充。
- 注意:这是 broker 时间,是消息在服务端的“出生”时间。
重试与自定义
- 重试次数 (retry times):
- 作用:记录该消息被 broker 重新投递给消费者的次数。每次消费失败触发重试,次数加一。第一次消费时为 0。
- 来源:由 broker 标记。消费者可以获取此信息以进行幂等处理或特殊逻辑。
- 自定义属性 (custom attributes):
- 作用:生产者可以添加任意的 key-value (字符串类型) 对作为扩展信息,供业务逻辑使用。
- 来源:由生产者根据需要指定。
- 消息负载 (message load):
- 作用:消息的实际业务数据内容,即有效载荷 (payload)。
- 来源:由生产者序列化成二进制字节流后设置。
- 约束:大小不能超过系统限制。
4.message的行为约束
- 大小限制 (size limit):
- 核心约束:单条消息的大小不能超过上限,否则发送会失败。
- 默认限制:4 mb。这是非常重要的参数,直接影响网络传输、存储和处理性能。
5.message的使用建议与最佳实践
- 避免单条消息过大 (overloaded transmission):
- 原因:rocketmq 是事件驱动的中间件。过大的消息会:
- 加重网络传输负担,增加延迟。
- 影响错误重试:重试大消息成本高。
- 影响流控 (throttling):流控粒度可能不够精细。
- 建议:
- 严格控制单条消息的数据量,使其尽可能小。
- 如果业务上必须传输大量数据,强烈建议:
- 拆分消息:将大数据按固定大小拆分成多条小消息。
- 使用外部存储:将实际数据(如文件、图片)存放到对象存储(如 oss)、文件系统或数据库中,然后在消息的
load
或custom attributes
中只传递数据的访问链接 (url) 或 id。
- 原因:rocketmq 是事件驱动的中间件。过大的消息会:
- 遵守消息不可变性原则 (immutability):
- 正确做法:收到消息后,如果需要转发或基于它生成新消息,使用
messagebuilder.buildfrom(m)
这样的方法创建一个新消息实例,然后修改新实例的属性(如 topic, tag, load 等)再发送。 - 错误做法:直接调用
m.update()
修改收到的消息m
的内容,然后发送。这违反了不可变性原则,可能导致不可预知的行为或在 5.x 版本中被拒绝。
- 正确做法:收到消息后,如果需要转发或基于它生成新消息,使用
总结与核心理解
message
是原子单元:它封装了业务数据和元信息,是 rocketmq 传输的最小单位。message
是不可变的:内容一旦产生,在传递过程中不应被修改。最佳实践是“读取-创建-发送”新消息。message
是持久化的:默认落盘存储,保证可靠性。message
拥有丰富的属性:topic/queue/offset
定义了其在系统中的位置和顺序。message id
提供全局唯一标识。keys/tag
支持查询和过滤。message type
定义了语义(普通、顺序、延迟、事务)。sending time/store timestamp
记录了关键时间点。retry times
协助处理消费失败。custom attributes
提供扩展能力。load
承载实际业务数据。
message
有严格的大小限制:默认 4mb。避免大消息是关键设计原则,应通过拆分或外链方式处理大数据。- 最佳实践:
- 合理使用
tag
进行消息过滤。 - 利用
keys
进行消息追踪。 - 遵守不可变性,通过
buildfrom
创建新消息。 - 绝对不要发送超过 4mb 的消息,采用拆分或外链方案。
- 合理使用
简而言之,message
是 rocketmq 的“信封”和“信件”本身。理解其结构、属性、约束和最佳实践,对于设计高效、可靠、可维护的消息系统至关重要。记住:小消息、不可变、善用属性、规避大负载。
发表评论