github:springbootdemo
gitee:springbootdemo
微信公众号:
0 开发环境
- jdk:1.8
- spring boot:2.7.18
- rabbitmq:3.13.1
- erlang:26.2.4
1 安装rabbitmq
1.1 安装rabbitmq
下载地址:https://www.rabbitmq.com/docs/download
windows下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.1/rabbitmq-server-3.13.1.exe
因为rabbitmq依赖erlang,所以下载完成后先不要安装,先在 https://www.rabbitmq.com/docs/which-erlang 查看所需的erlang版本,下载对应erlang安装包
1.2 安装erlang
erlang下载地址:https://www.erlang.org/downloads
依次安装erlang、rabbitmq
计算机服务中会出现rabbitmq服务,直接右键启动即可。
也可在开始菜单中找到rabbitmq server目录,点击rabbitmq service - start启动
1.3 启用rabbitmq管理界面
默认情况下,rabbitmq没有启用web端客户端插件,需要启用才可以生效
参考文档:https://www.rabbitmq.com/docs/management
进入到rabbitmq安装目录下 rabbitmq server\rabbitmq_server-3.13.1\sbin 执行命令
rabbitmq-plugins enable rabbitmq_management
安装成功,重启rabbitmq服务,浏览器访问 127.0.0.1:15672
默认账号密码都是guest,登录成功后界面如下
2 广播模式fanout
fanout,发布订阅模式,是一种广播机制,是没有路由key的模式。
2.1 新建生产者
新建module spring-boot-rabbitmq-producer
2.1.1 引入依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
2.1.2 配置rabbitmq
server:
port: 8090
#
#配置rabbitmq服务
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
host: 127.0.0.1
port: 5672
2.1.3 新建配置类
@configuration
public class rabbitmqfanoutconfig {
/**
* fanout 交换机
*
* @return
*/
@bean
public fanoutexchange fanoutexchange() {
return new fanoutexchange("fanout_user_exchange", true, false);
}
/**
* 发送短信队列
*
* @return
*/
@bean
public queue smsqueue() {
//durable 是否持久化,默认为false
//exclusive 是否只能被当前创建的连接使用,默认为false
//autodelete 是否自动删除,默认为false
//一般设置队列的持久化,其余两个默认false
return new queue("sms.fanout.queue", true);
}
/**
* 发送邮件队列
*
* @return
*/
@bean
public queue emailqueue() {
return new queue("email.fanout.queue", true);
}
/**
* 发送微信队列
*
* @return
*/
@bean
public queue wechatqueue() {
return new queue("wechat.fanout.queue", true);
}
/**
* 将队列和交换机绑定
*
* @return
*/
@bean
public binding smsbindingfanout() {
return bindingbuilder.bind(smsqueue()).to(fanoutexchange());
}
@bean
public binding emailbindingfanout() {
return bindingbuilder.bind(emailqueue()).to(fanoutexchange());
}
@bean
public binding wechatbindingfanout() {
return bindingbuilder.bind(wechatqueue()).to(fanoutexchange());
}
}
2.1.4 新建生产者服务
@service
public class userservice {
@autowired
private rabbittemplate rabbittemplate;
public void register(string username, string password) {
//模拟用户注册
string userid = uuid.randomuuid().tostring();
//saveuser(user);
//发送用户信息给rabbitmq fanout
rabbittemplate.convertandsend("fanout_user_exchange", "", userid + username);
}
}
2.1.5 新建测试类
@springboottest(classes = rabbitmqproducerapplication.class, webenvironment = springboottest.webenvironment.defined_port)
public class rabbitmqproducerapplicationtest {
@autowired
private userservice userservice;
@test
public void contextloads() {
userservice.register("张三", "zhangsan");
}
}
启动测试类,执行成功,查看rabbitmq web页面,成功绑定,且各存在一条消息
2.2 新建消费者
新建module spring-boot-rabbitmq-consumer
2.2.1 引入依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
2.2.2 配置rabbitmq
server:
port: 8090
#
#配置rabbitmq服务
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
host: 127.0.0.1
port: 5672
2.2.3 新建短信消费服务
@rabbitlistener(queues = "sms.fanout.queue")
@service
public class fanoutsmsservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//发送短信
system.out.println("sms.fanout:" + message);
}
}
2.2.4 新建邮件消费服务
@rabbitlistener(queues = "email.fanout.queue")
@service
public class fanoutemailservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//发送邮件
system.out.println("email.fanout:" + message);
}
}
2.2.5 新建微信消费服务
@rabbitlistener(queues = "wechat.fanout.queue")
@service
public class fanoutwechatservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//推送微信消息
system.out.println("wechat.fanout:" + message);
}
}
2.3 测试
新建启动类,启动服务,控制台输出如下,消费者成功接收到消息
rabitmq中消息为0
再次执行生产者测试类发送新的消息,消费者成功接收并打印
3 路由模式direct
有routing-key的匹配模式,direct模式是fanout模式上的一种叠加,增加了路由routingkey的模式。
3.1 消费者
3.1.1 新建短信消费服务
这里,我们不在生产者中使用配置类来绑定交换机和队列了,直接在消费者中使用注解来绑定
使用 @rabbitlistener(bindings = @queuebinding()) 注解来绑定交换机和队列
- bindings 用来确定队列和交换机的绑定关系
- value 队列名称,与生产者对应
- exchange 交换机名称,与生产者对应;type设置rabbitmq模式,默认为direct
- key 路由key
但有些高级用法,还是推荐使用配置类
@rabbitlistener(bindings = @queuebinding(
value = @queue(value = "sms.direct.queue", durable = "true", autodelete = "false"),
exchange = @exchange(value = "direct_user_exchange", type = exchangetypes.direct),
key = "sms"))
@service
public class directsmsservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//发送短信
system.out.println("sms.direct:" + message);
}
}
3.1.2 新建邮件消费服务
@rabbitlistener(bindings = @queuebinding(
value = @queue(value = "email.direct.queue", durable = "true", autodelete = "false"),
exchange = @exchange(value = "direct_user_exchange"),
key = "email"))
@service
public class directemailservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//发送邮件
system.out.println("email.direct:" + message);
}
}
3.1.3 新建微信消费服务
@rabbitlistener(bindings = @queuebinding(
value = @queue(value = "wechat.direct.queue", durable = "true", autodelete = "false"),
exchange = @exchange(value = "direct_user_exchange"),
key = "wechat"))
@service
public class directwechatservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//推送微信消息
system.out.println("wechat.direct:" + message);
}
}
3.2 生产者
3.2.1 调整服务类
@service
public class userservice {
@autowired
private rabbittemplate rabbittemplate;
//...
public void registerdirect(string username, string password) {
//模拟用户注册
string userid = uuid.randomuuid().tostring();
//saveuser(user);
rabbittemplate.convertandsend("direct_user_exchange", "sms", userid + username);
rabbittemplate.convertandsend("direct_user_exchange", "email", userid + username);
}
}
3.2.2 调整测试类
@springboottest(classes = rabbitmqproducerapplication.class, webenvironment = springboottest.webenvironment.defined_port)
public class rabbitmqproducerapplicationtest {
@autowired
private userservice userservice;
@test
public void contextloads() {
userservice.register("张三", "zhangsan");
}
@test
public void contextloadsdirect() {
userservice.registerdirect("李四", "lisi");
}
}
3.3 测试
启动消费者服务,执行生产者测试类contextloadsdirect,消费者成功收到消息并打印
4 主题模式topic
模糊的routing-key的匹配模式,topic模式是direct模式上的一种叠加,增加了模糊路由routingkey的模式。
4.1 消费者
4.1.1 新建配置类
一般在生产中,先启动消费者服务,再启动生产者服务,因此我们通常把配置类放在消费者服务中,否则可能导致无队列监听而服务启动失败或报错
@configuration
public class rabbitmqtopicconfig {
/**
* topic 交换机
*
* @return
*/
@bean
public topicexchange topicexchange() {
return new topicexchange("topic_user_exchange", true, false);
}
/**
* 发送短信队列
*
* @return
*/
@bean
public queue smsqueue() {
//durable 是否持久化,默认为false
//exclusive 是否只能被当前创建的连接使用,默认为false
//autodelete 是否自动删除,默认为false
//一般设置队列的持久化,其余两个默认false
return new queue("sms.topic.queue", true);
}
/**
* 发送邮件队列
*
* @return
*/
@bean
public queue emailqueue() {
return new queue("email.topic.queue", true);
}
/**
* 发送微信队列
*
* @return
*/
@bean
public queue wechatqueue() {
return new queue("wechat.topic.queue", true);
}
/**
* 将队列和交换机绑定, 并设置用于匹配键
*
* @return
*/
@bean
public binding smsbindingtopic() {
return bindingbuilder.bind(smsqueue()).to(topicexchange()).with("*.sms.#");
}
@bean
public binding emailbindingtopic() {
return bindingbuilder.bind(emailqueue()).to(topicexchange()).with("#.email.#");
}
@bean
public binding wechatbindingtopic() {
return bindingbuilder.bind(wechatqueue()).to(topicexchange()).with("#.wechat.*");
}
}
4.1.2 新建短信消费服务
@rabbitlistener(queues = "sms.topic.queue")
@service
public class topicsmsservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//发送短信
system.out.println("sms.topic:" + message);
}
}
4.1.3 新建邮件消费服务
@rabbitlistener(queues = "email.topic.queue")
@service
public class topicemailservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//发送邮件
system.out.println("email.topic:" + message);
}
}
4.1.4 新建微信消费服务
@rabbitlistener(queues = "wechat.topic.queue")
@service
public class topicwechatservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
//推送微信消息
system.out.println("wechat.topic:" + message);
}
}
4.2 生产者
4.2.1 调整服务类
public void registertopic(string username, string password) {
//模拟用户注册
string userid = uuid.randomuuid().tostring();
//saveuser(user);
rabbittemplate.convertandsend("topic_user_exchange", "*.sms.email.*", userid + username);
}
4.2.2 调整测试类
@test
public void contextloadstopic() {
userservice.registertopic("王五", "wangwu");
}
4.3 测试
启动消费者服务,执行生产者测试类contextloadstopic,消费者成功收到消息并打印
5 设置过期时间
5.1 调整消费者配置类
调整rabbitmqtopicconfig
/**
* durable 创建持久化队列phone.topic.queue
* withargument 设置消息过期时间60000毫秒
*
* @return
*/
@bean
public queue phonequeue() {
return queuebuilder.durable("phone.topic.queue").withargument("x-message-ttl", 60000).build();
}
@bean
public binding phonebindingtopic() {
return bindingbuilder.bind(phonequeue()).to(topicexchange()).with("#.phone.#");
}
5.2 测试
启动服务,rabitmq中队列如下,队列标识为ttl
6 消息确认机制
以下均为生产者module的调整
6.1 调整生产者yml
server:
port: 8090
#
#配置rabbitmq服务
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
host: 127.0.0.1
port: 5672
#none 禁用发布确认模式,默认值
#correlated 消息成功发布到交换机后会触发回调方法
publisher-confirm-type: correlated
6.2 新建回调方法
public class messageconfirmcallback implements rabbittemplate.confirmcallback {
@override
public void confirm(correlationdata correlationdata, boolean b, string s) {
if (b) {
system.out.println("消息确认成功");
} else {
system.out.println("消息确认失败");
}
}
}
6.3 调整服务类
public void registertopiccallback(string username, string password) {
//模拟用户注册
string userid = uuid.randomuuid().tostring();
//saveuser(user);
//设置消息确认
rabbittemplate.setconfirmcallback(new messageconfirmcallback());
rabbittemplate.convertandsend("topic_user_exchange", "*.sms.email.*", userid + username);
}
6.4 调整测试类
@test
public void contextloadstopiccallback() {
userservice.registertopiccallback("周六", "zhouliu");
}
执行测试类contextloadstopiccallback,成功进入回调方法
7 死信队列
dlx(dead-letter-exchange),称为死信交换机或死信邮箱。
当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是dlx ,绑定dlx的队列就是死信队列。
消息变成死信,可能是由于以下的原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
dlx也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。
当这个队列中存在死信时,rabbitmq就会自动将这个消息重新发布到设置的dlx上去,进而被路由到另一个队列,即死信队列。
在定义队列的时候设置队列参数 x-dead-letter-exchange
指定交换机即可设置为死信队列。
7.1 调整消费者配置类
调整rabbitmqtopicconfig
@bean
public queue dlxqueue() {
return new queue("dlx.topic.queue", true);
}
@bean
public queue vipqueue() {
return queuebuilder.durable("vip.topic.queue")
.withargument("x-message-ttl", 60000)
.withargument("x-dead-letter-exchange", "dlx.topic.queue")
.build();
}
@bean
public binding dlxbindingtopic() {
return bindingbuilder.bind(dlxqueue()).to(topicexchange()).with("#.#");
}
@bean
public binding vipbindingtopic() {
return bindingbuilder.bind(vipqueue()).to(topicexchange()).with("#.vip.#");
}
7.2 测试
启动服务,rabitmq中队列如下,队列标识为dlx
7.3 调整生产者服务类
public void registertopicdlx(string username, string password) {
//模拟用户注册
string userid = uuid.randomuuid().tostring();
//saveuser(user);
rabbittemplate.convertandsend("topic_user_exchange", "*.vip.*", userid + username);
}
7.4 调整生产者测试类
@test
public void contextloadstopicdlx() {
userservice.registertopicdlx("钱七", "qianqi");
}
7.5 测试
关闭消费者服务,执行生产者测试类contextloadstopicdlx
未过期前
过期后
7.6 新建死信消费服务
消费者新建
@rabbitlistener(queues = "dlx.topic.queue")
@service
public class topicdlxservice {
/**
* 消息接收的方法
*
* @param message
*/
@rabbithandler
public void receivemessage(string message) {
system.out.println("dlx.topic:" + message);
}
}
7.7 测试
再启动消费者服务,成功接收死信队列中消息
rabbitmq中消息队列如下
至此,springboot成功整合rabbitmq且测试通过,集群、分布式事务等用法敬请等待后续。
发表评论