确保 rabbitmq 的延时消息插件已经安装和启用。你可以通过执行以下命令来安装该插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如果提示未安装,以下是安装流程:
查看mq版本:
下载插件
查看延时队列
确保你的项目中引入了 spring-rabbit 依赖。
在你的 spring boot 配置文件中添加以下配置:
spring:
rabbitmq:
host: 127.0.0.1
password: ***
port: 5672
username: ***
virtual-host: ***
这些属性是用于配置spring框架中与rabbitmq消息队列相关的属性。让我逐个介绍一下每个属性的作用:
rabbitmq.host: 指定rabbitmq服务器的主机地址。在这个示例中,主机地址是127.0.0.1,也就是本地主机。
rabbitmq.port: 指定rabbitmq服务器的端口号。默认的rabbitmq端口号是5672,因此在这个示例中,端口号为5672。
rabbitmq.username: 指定连接rabbitmq服务器时使用的用户名。在这个示例中,用户名是***。
rabbitmq.password: 指定连接rabbitmq服务器时使用的密码。在这个示例中,密码是***。
rabbitmq.virtual-host: 指定rabbitmq服务器的虚拟主机。虚拟主机是rabbitmq中用于隔离不同应用之间的消息队列的逻辑概念。在这个示例中,虚拟主机是***。
rabbitmq管理平台配置延时队列:
直接使用 direct 类型的交换机是无法实现延时队列功能的。direct 类型的交换机是根据 routing key 进行精确匹配,将消息发送到与 routing key 完全匹配的队列中,而不会对消息进行延时投递。
如果你需要实现延时队列功能,可以考虑使用 rabbitmq 插件 rabbitmq_delayed_message_exchange 提供的 x-delayed-message 类型的交换机。该插件允许你在 rabbitmq 中创建一个特殊类型的交换机,支持延时消息的投递。
使用 x-delayed-message 类型的交换机,你可以在发送消息时设置消息的头部属性作为延时时间,并且消息将会在指定的延时时间之后被投递到相应的队列。
请注意,使用 x-delayed-message 类型的交换机需要安装并启用 rabbitmq_delayed_message_exchange 插件。安装完成后,你可以在 rabbitmq 中创建该类型的交换机,并按需使用延时队列功能。
当你在 rabbitmq 中新增一个交换机时,需要设置一些属性来定义该交换机的行为。下面是各个属性的详细介绍:
name(名称):
交换机的名称,用于标识该交换机。
名称必须是唯一的。
type(类型):
交换机的类型,用于定义消息的路由方式。
常见的交换机类型包括 direct、fanout、headers、topic 和 x-delayed-message。
durability(持久化):
交换机是否持久化到磁盘。
如果将此属性设置为 true,则 rabbitmq 会将该交换机保存到磁盘上,以便在服务器重启后仍然存在。
auto delete(自动删除):
是否在最后一个绑定到交换机的队列被删除后自动删除交换机。
如果将此属性设置为 true,则 rabbitmq 会在没有任何队列绑定到该交换机时自动删除该交换机。
internal(内部):
是否是一个内部交换机。
如果将此属性设置为 true,则该交换机只能被其他交换机使用,而不能被客户端直接发送消息到该交换机。
arguments(参数):
交换机的其他参数,用于控制交换机的行为。
argument 的值可以是任何有效的 erlang 对象。
在设置延时队列时,你需要使用 x-delayed-type 参数来指定该交换机的类型为 x-delayed-message,并使用 x-delayed-message 的头部属性 x-delay 来指定消息的延迟时间。
通过配置这些属性,spring框架可以使用指定的主机、端口、用户名和密码来连接rabbitmq服务器,并在指定的虚拟主机中进行消息队列的操作。这样,我们就可以在spring应用程序中方便地使用rabbitmq来实现消息的发送和接收。
在你的代码中,使用 rabbittemplate 的 convertandsend() 方法发送消息到延时队列。同时,设置消息的延时时间。
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
@component
public class delayedqueueexample {
@autowired
private rabbittemplate rabbittemplate;
public void sendmessagewithdelay(messagedto messagedto, int delayinmillis) {
rabbittemplate.convertandsend("my-delayed_exchange", "sms.api.routing",
messagedto, message -> {
message.getmessageproperties().setheader("x-delay", delayinmillis);//毫秒
return message;
});
system.out.println("message sent to delayed queue.");
}
}
在这个示例中,我们使用 rabbittemplate 的 convertandsend() 方法发送消息到名为 delayed_exchange 的交换机,并指定路由键为 ums.api.routing。我们还为消息设置了延时时间,通过在 convertandsend() 方法中使用 lambda 表达式修改消息的属性。
请注意,你需要将 delayed_exchange 替换为实际的交换机名称,根据你的需求修改路由键和消息体。此外,确保 messagedto 类是可序列化的,并在你的项目中定义。
这样,你的消息就会被发送到延时队列中,等待指定的延时时间后才会被消费。
如果在 x-delayed-message 交换机中不设置延迟时间,其行为与普通的 direct 交换机非常相似。
direct 交换机是 rabbitmq 中最简单的一种交换机类型,它将消息发送到与绑定键完全匹配的队列。当你使用 direct 交换机时,可以通过指定绑定键来路由消息到特定的队列。
x-delayed-message 交换机是一个插件提供的自定义交换机,它允许你发送带有延迟的消息。你可以通过设置消息的 x-delay 属性来指定消息的延迟时间。当消息到达 x-delayed-message 交换机时,交换机会根据消息的延迟时间将其暂时存储,然后在延迟时间到达后将其转发到相应的队列。
如果在 x-delayed-message 交换机中不设置延迟时间(即所有消息的 x-delay 属性都为零或未设置),该交换机将表现得与 direct 交换机相同。消息将立即被路由到匹配的队列,没有任何延迟。
因此,如果不需要延迟消息功能,只需直接发送消息到 x-delayed-message 交换机,它将像 direct 交换机一样工作。如果需要延迟消息功能,请确保为每条消息设置适当的 x-delay 属性,以指定延迟时间。
已经创建好的交换机不能再更新type:
rabbitmq 中的交换机类型是在创建时确定的,一旦交换机创建后,就无法直接修改其类型。要更改交换机的类型,你需要按照以下步骤进行操作:
创建一个新的交换机,使用所需的类型。例如,如果你想将交换机从 direct 类型更改为 fanout 类型,可以创建一个新的 fanout 交换机。
将绑定到旧交换机上的队列和绑定迁移到新交换机。这可以通过解绑旧交换机和绑定新交换机来实现。
解绑旧交换机:使用命令或管理界面将所有队列从旧交换机解绑。这将使队列不再接收来自旧交换机的消息。
绑定新交换机:使用相同的绑定规则将队列绑定到新交换机上。这将确保消息可以通过新的交换机路由到正确的队列。
确保在转移过程中不会丢失任何消息。你可以选择在迁移前停止消息发布者,以确保没有新消息发送到旧交换机上。然后,在完成旧交换机和新交换机之间的迁移后,重新启动消息发布者。
请注意,在进行交换机类型更改时,可能会涉及到一些风险和潜在问题,因此建议在生产环境中小心操作并进行充分的测试。
另外,如果你只是想更改交换机的属性而不是类型,例如更改交换机的名称、持久性或其他属性,你可以使用 rabbitmq 的管理界面或命令行工具对交换机进行更新。
交换机类型区别:
在 rabbitmq 中,有几种不同类型的交换机(exchange),包括 direct、fanout、headers、topic 和 x-delayed-message。下面我将详细描述每种类型的区别:
direct(直连交换机):
direct 交换机是最简单的一种类型,它将消息直接路由到与绑定键(binding key)完全匹配的队列。
发送到 direct 交换机的消息需要指定一个 routing key,该 routing key 与队列绑定时指定的 binding key 进行匹配。
fanout(扇形交换机):
fanout 交换机会将消息广播到绑定到它的所有队列,无论绑定键是什么。
发送到 fanout 交换机的消息会被转发到所有与之绑定的队列,而 routing key 将被忽略。
headers(标头交换机):
headers 交换机使用消息的标头属性来进行匹配,而不是使用 routing key。
发送到 headers 交换机的消息会根据标头属性的匹配情况被转发到相应的队列。
topic(主题交换机):
topic 交换机使用通配符和 routing key 的模式匹配来路由消息。
发送到 topic 交换机的消息的 routing key 是一个由多个单词组成的字符串,这些单词用点号(.)分隔。
消息的 routing key 将被与队列绑定时指定的 binding key 进行模式匹配,符合匹配规则的消息将被路由到相应的队列。
x-delayed-message(延迟消息交换机):
x-delayed-message 交换机是 rabbitmq 的一个插件提供的特殊类型的交换机,它支持延迟消息的投递。
x-delayed-message 交换机可以根据消息的头部属性中设置的延迟时间来进行延迟投递。
使用 x-delayed-message 交换机时,需要安装并启用 rabbitmq 的 "rabbitmq_delayed_message_exchange" 插件。
总结:
direct 交换机通过完全匹配 routing key 将消息发送到指定队列;
fanout 交换机广播消息到所有绑定的队列;
headers 交换机根据标头属性来匹配并转发消息;
topic 交换机通过模式匹配 routing key 来转发消息;
x-delayed-message 交换机用于延迟消息的投递,需要安装额外的插件。
不同类型的交换机在消息路由和转发方式上有所不同,选择正确的交换机类型有助于实现所需的消息传递逻辑。
发表评论