什么是rabbitmq
rabbitmq与kafka的区别
rabbitmq与kafka的各自适用场景
rabbitmq的安装
erlang下载安装
wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm
yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm
rabbitmq下载安装
wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm
rabbotmq的启动
systemctl start rabbitmq-server #启动
systemctl enable rabbitmq-server #设置开机自启
systemctl status rabbitmq-server #查看状态
rabbitmq web界面管理
默认情况下,是没有安装web端的客户端插件,需要安装插件才可以生效。执行命令:
rabbitmq-plugins enable rabbitmq_management
然后需要重启服务
systemctl restart rabbitmq-server
由于web管理界面访问端口为15672,所以防火墙需要放行该端口
对于 centos 7 上的防火墙,要放行端口 15672(默认 rabbitmq 管理界面的端口),可以按照以下步骤进行操作:
-
登录到 centos 7 的服务器上,以具有管理员权限的用户身份。
-
检查防火墙状态,确认是否已安装 firewalld 防火墙:
systemctl status firewalld
-
如果防火墙处于开启状态,可以直接跳转到第 6 步。如果防火墙停止运行,则需要启动,请继续执行以下步骤。
-
启动 firewalld 服务:
systemctl start firewalld
-
设置 firewalld 开机自启:
systemctl enable firewalld
-
添加端口规则,允许在防火墙上开放 15672 端口和5672端口:
firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --zone=public --add-port=5672/tcp --permanent
-
重新加载防火墙配置,使更改生效:
firewall-cmd --reload
现在,centos 7 的防火墙应该已经放行了 15672 端口和5672 端口,允许对 rabbitmq 管理界面进行访问。请注意,为了安全起见,建议仅在需要时才开放必要的端口,并在完成使用后关闭不必要的端口。
web界面访问管理
rabbitmq 默认的管理界面账号和密码通常是:
- 用户名:guest
- 密码:guest
这对默认凭据在 rabbitmq 安装后可用于访问管理界面(只限于本地)。然而,出于安全考虑,强烈建议在生产环境中修改默认凭据或创建新的管理员帐户,并使用更强大的密码来加强安全性。
要在 rabbitmq 中添加新用户,您需要使用 rabbitmq 提供的命令行工具或者管理界面进行操作。下面是两种方法的简要说明:
方法一:使用 rabbitmq 命令行工具
-
打开命令行终端。
-
导航到 rabbitmq 安装目录的 sbin 文件夹(例如,在 linux 上可能是 /usr/lib/rabbitmq/sbin)。
-
运行以下命令来添加新用户:
rabbitmqctl add_user <username> <password>
将
<username>
替换为要创建的用户名,将<password>
替换为所需的密码。 -
运行以下命令来赋予用户管理员权限:
rabbitmqctl set_user_tags <username> administrator
将
<username>
替换为刚创建的用户名。
方法二:使用 rabbitmq 管理界面
- 打开您的浏览器并访问 rabbitmq 管理界面。默认地址为
http://localhost:15672
。 - 使用默认的管理员账号和密码(通常是
guest
/guest
)登录到管理界面。 - 在管理界面上导航到 “admin” -> “users” 选项卡。
- 单击 “add a user” 按钮。
- 输入用户名和密码,并选择 “tag” 为 “administrator”。
- 单击 “add user” 按钮以创建新用户。
无论您使用哪种方法,确保为新用户选择一个强大的密码,并在生产环境中遵循安全最佳实践。
访问方式为 主机ip地址配合端口号,例如 192.168.18.14:15672
注意,此处需要进行一次授权,否则在代码中连接rabbitmq会失败
springboot+rabbitmq实战
引入依赖
<!--rabbitmq-->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
修改配置文件
server:
port: 8080
spring:
application:
name: rabbitmqexample
rabbitmq:
host: 192.168.18.14
port: 5672
username: admin
password: admin
rabbitmq五种消息模型
rabbitmq简单模式(生产者消费者模式)
配置类
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
* @description mq配置类
* @author it小辉同学
* @date 2023/06/16
*/
@configuration
public class rabbitmqconfig {
/**
* @return {@link queue }
* @description 设置队列
* @author it小辉同学
* @date 2023/06/16
*/
@bean
public queue queue(){
return new queue("simple.hello");
}
}
生产者
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
/**
* @description 生产者(发送消息)
* @author it小辉同学
* @date 2023/06/16
*/
@service
public class messagesender {
@autowired
private rabbittemplate rabbittemplate;
/**
* @param message 消息
* @description 发送消息
* @author it小辉同学
* @date 2023/06/16
*/
public void sendmessage(string message) {
system.out.println("发送祝福:" + message);
rabbittemplate.convertandsend("simple.hello", message);
}
}
消费者
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
/**
* @description 消息接收器
* @author it小辉同学
* @date 2023/06/16
*/
@service
public class messagereceiver {
@autowired
private rabbittemplate rabbittemplate;
/**
* @param message 消息
* @description 处理消息
* @author it小辉同学
* @date 2023/06/16
*/
@rabbitlistener(queues = "simple.hello")
public void handlemessage(string message) {
system.out.println("我收到了你的祝福: " + message);
}
}
测试
import com.xiaohui.service.messagesender;
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
@springboottest
public class mqtestdemo {
@autowired
private messagesender messagesender;
@test
public void testdemo1(){
messagesender.sendmessage("我想跟你说:希望你开心快乐!!!");
}
}
注意:不可以省略@springboottest,否则监听不到mq配置
rabbitmq工作队列模式(广播模式)
配置类
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.fanoutexchange;
import org.springframework.amqp.core.queue;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
* @description mq配置类
* @author it小辉同学
* @date 2023/06/16
*/
@configuration
public class rabbitmqconfig {
//队列1
private static final string queue01 = "queue01";
//队列2
private static final string queue02 = "queue02";
//交换机
private static final string exchange_name = "fanout_exchange";
@bean
public queue queue1() {
return new queue(queue01);
}
@bean
public queue queue2() {
return new queue(queue02);
}
@bean
public fanoutexchange fanoutexchange() {
return new fanoutexchange(exchange_name);
}
@bean
public binding binding01() {
return bindingbuilder.bind(queue1()).to(fanoutexchange());
}
@bean
public binding binding02() {
return bindingbuilder.bind(queue2()).to(fanoutexchange());
}
}
生产者
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
/**
* @description 生产者(发送消息)
* @author it小辉同学
* @date 2023/06/16
*/
@service
public class messagesender {
@autowired
private rabbittemplate rabbittemplate;
/**
* @param message 消息
* @description 发送消息
* @author it小辉同学
* @date 2023/06/16
*/
public void sendmessage(string message) {
system.out.println( message);
rabbittemplate.convertandsend("fanout_exchange","", message);
}
}
消费者
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
/**
* @description 消息接收器
* @author it小辉同学
* @date 2023/06/16
*/
@service
public class messagereceiver {
@autowired
private rabbittemplate rabbittemplate;
/**
* @param message 消息
* @description 消费者01
* @author it小辉同学
* @date 2023/06/16
*/
@rabbitlistener(queues = "queue01")
public void receiver01(string message) {
system.out.println("队列01:奔赴山海," + message);
}
/**
* @param message 消息
* @description 消费者012
* @author it小辉同学
* @date 2023/06/16
*/
@rabbitlistener(queues = "queue02")
public void receiver02(string message) {
system.out.println("队列02:向阳而生," + message);
}
}
测试
@springboottest
public class mqtestdemo {
@autowired
private messagesender messagesender;
@test
public void testdemo1(){
messagesender.sendmessage("相信梦想。。。。。。");
}
}
rabbitmq发布/订阅模式
配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
* @description mq配置类
* @author it小辉同学
* @date 2023/06/16
*/
@configuration
public class rabbitmqconfig {
//队列01
private static final string queue01= "queue01";
//队列02
private static final string queue02= "queue02";
//交换机
private static final string exchange_name = "direct_exchange";
//路由键01
private static final string routingkey01 = "queue_route01";
//路由键02
private static final string routingkey02 = "queue_route02";
@bean
public queue queue01(){
return new queue(queue01);
}
@bean
public queue queue02(){
return new queue(queue02);
}
@bean
public directexchange directexchange(){
return new directexchange(exchange_name);
}
@bean
public binding binding1(){
//将列队01绑定到交换机上为给他设置路由键
return bindingbuilder.bind(queue01()).to(directexchange()).with(routingkey01);
}
@bean
public binding binding2(){
//将列队02绑定到交换机上为给他设置路由键
return bindingbuilder.bind(queue02()).to(directexchange()).with(routingkey02);
}
}
生产者
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
/**
* @description 生产者(发送消息)
* @author it小辉同学
* @date 2023/06/16
*/
@service
public class messagesender {
@autowired
private rabbittemplate rabbittemplate;
/**
* @param message 消息
* @description 发送消息
* @author it小辉同学
* @date 2023/06/16
*/
public void sendmessage(string message) {
system.out.println( message);
rabbittemplate.convertandsend("direct_exchange","queue_route01", message);
}
}
消费者
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
/**
* @description 消息接收器
* @author it小辉同学
* @date 2023/06/16
*/
@service
public class messagereceiver {
@autowired
private rabbittemplate rabbittemplate;
/**
* @param message 消息
* @description 消费者01
* @author it小辉同学
* @date 2023/06/16
*/
@rabbitlistener(queues = "queue01")
public void receiver01(string message) {
system.out.println("队列01——路由01:奔赴山海," + message);
}
/**
* @param message 消息
* @description 消费者012
* @author it小辉同学
* @date 2023/06/16
*/
@rabbitlistener(queues = "queue02")
public void receiver02(string message) {
system.out.println("队列02——路由02:向阳而生," + message);
}
}
测试
import com.xiaohui.service.messagesender;
import org.junit.jupiter.api.test;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
@springboottest
public class mqtestdemo {
@autowired
private messagesender messagesender;
@test
public void testdemo1(){
messagesender.sendmessage("相信梦想。。。。。。");
}
}
rabbitmq路由模式
配置类
接着,定义一个交换机和两个队列:
@configuration
public class rabbitconfig {
public static final string exchange_name = "direct_exchange";
public static final string queue_name_1 = "queue_1";
public static final string queue_name_2 = "queue_2";
public static final string routing_key_1 = "key_1";
public static final string routing_key_2 = "key_2";
@bean
public directexchange directexchange() {
return new directexchange(exchange_name);
}
@bean
public queue queue1() {
return new queue(queue_name_1);
}
@bean
public queue queue2() {
return new queue(queue_name_2);
}
@bean
public binding binding1() {
return bindingbuilder.bind(queue1()).to(directexchange()).with(routing_key_1);
}
@bean
public binding binding2() {
return bindingbuilder.bind(queue2()).to(directexchange()).with(routing_key_2);
}
}
生产者
@service
public class messageproducer {
@autowired
private amqptemplate amqptemplate;
public void send(string message, string routingkey) {
amqptemplate.convertandsend(rabbitconfig.exchange_name, routingkey, message);
system.out.println("sent message: " + message + ", routing key: " + routingkey);
}
}
消费者
@service
public class messageconsumer {
@rabbitlistener(queues = rabbitconfig.queue_name_1)
public void receivefromqueue1(string message) {
system.out.println("received message from queue 1: " + message);
}
@rabbitlistener(queues = rabbitconfig.queue_name_2)
public void receivefromqueue2(string message) {
system.out.println("received message from queue 2: " + message);
}
}
测试
@service
public class testservice {
@autowired
private messageproducer producer;
@postconstruct
public void test() {
producer.send("hello, queue 1", rabbitconfig.routing_key_1);
producer.send("hello, queue 2", rabbitconfig.routing_key_2);
}
}
运行程序后,可以看到控制台输出:
sent message: hello, queue 1, routing key: key_1
sent message: hello, queue 2, routing key: key_2
received message from queue 1: hello, queue 1
received message from queue 2: hello, queue 2
说明消息成功发送到了指定的队列。
rabbitmq主题模式
配置类
@configuration
public class rabbitconfig {
public static final string exchange_name = "topic_exchange";
public static final string queue_name_1 = "queue_1";
public static final string queue_name_2 = "queue_2";
public static final string queue_name_3 = "queue_3";
public static final string routing_key_1 = "key_1.*";
public static final string routing_key_2 = "key_2.*";
public static final string routing_key_3 = "*.key_3";
@bean
public topicexchange topicexchange() {
return new topicexchange(exchange_name);
}
@bean
public queue queue1() {
return new queue(queue_name_1);
}
@bean
public queue queue2() {
return new queue(queue_name_2);
}
@bean
public queue queue3() {
return new queue(queue_name_3);
}
@bean
public binding binding1() {
return bindingbuilder.bind(queue1()).to(topicexchange()).with(routing_key_1);
}
@bean
public binding binding2() {
return bindingbuilder.bind(queue2()).to(topicexchange()).with(routing_key_2);
}
@bean
public binding binding3() {
return bindingbuilder.bind(queue3()).to(topicexchange()).with(routing_key_3);
}
}
生产者
在生产者端,发送消息到交换机,这里使用了三种不同的路由键:
@service
public class messageproducer {
@autowired
private amqptemplate amqptemplate;
public void send(string message, string routingkey) {
amqptemplate.convertandsend(rabbitconfig.exchange_name, routingkey, message);
system.out.println("sent message: " + message + ", routing key: " + routingkey);
}
}
消费者
在消费者端,使用通配符连接到交换机,并指定一个消费者:
@service
public class messageconsumer {
@rabbitlistener(queues = rabbitconfig.queue_name_1)
public void receivefromqueue1(string message) {
system.out.println("received message from queue 1: " + message);
}
@rabbitlistener(queues = rabbitconfig.queue_name_2)
public void receivefromqueue2(string message) {
system.out.println("received message from queue 2: " + message);
}
@rabbitlistener(queues = rabbitconfig.queue_name_3)
public void receivefromqueue3(string message) {
system.out.println("received message from queue 3: " + message);
}
}
测试
现在,我们来测试一下,这里发送了三条消息,分别匹配了不同的绑定键:
@service
public class testservice {
@autowired
private messageproducer producer;
@postconstruct
public void test() {
producer.send("hello, queue 1", rabbitconfig.routing_key_1);
producer.send("hello, queue 2", rabbitconfig.routing_key_2);
producer.send("hello, queue 3", rabbitconfig.routing_key_3);
}
}
运行程序后,可以看到控制台输出:
sent message: hello, queue 1, routing key: key_1.*
sent message: hello, queue 2, routing key: key_2.*
sent message: hello, queue 3, routing key: *.key_3
received message from queue 1: hello, queue 1
received message from queue 2: hello, queue 2
received message from queue 3: hello, queue 3
我的天哪,太不容易了,坑倒是不多,文字东西太多了,还不能违背初心去抄袭,花费时间很长!如果您看到这里,祝贺你,我们一起成长了!感谢相遇,再会有期!!!
发表评论