基于rabbitmq的mqtt
rabbitmq的官方文档地址:https://www.rabbitmq.com/docs/mqtt
目前已经支持的mqtt版本如下:3.1
、3.1.1
、5.0
1、创建rabbitmq 服务
> docker run -d --name rabbitmq --restart always -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbitmq:3-management
1.1、端口说明
在使用 docker 部署 rabbitmq 并开通 mqtt 服务时,你需要确保以下端口在安全组中是开放的:
-
amqp 端口(默认端口:
5672
): 这是 rabbitmq 使用的端口,用于 amqp 协议。确保你的安全组允许来自需要访问 rabbitmq 的客户端的流量通过这个端口。 -
管理界面端口(默认端口:
15672
): 如果你需要访问 rabbitmq 的 web 管理界面,你需要开放这个端口。 -
mqtt 端口(默认端口:
1883
): 这是 mqtt 服务使用的端口。确保这个端口也被安全组允许通过,以便 mqtt 客户端可以连接到 rabbitmq 的 mqtt 服务。
在我们启动docker container时,没有修改映射的端口,则可以直接使用如上端口
根据你的安全策略和网络配置,你可能还需要考虑其他端口或配置,例如 tls 加密通信等。确保只开放必要的端口,并采取适当的安全措施来保护你的 rabbitmq 服务。
1.2、dashboard - 检查 5672、1883 端口
telnet 127.0.0.1 5672
telnet 127.0.0.1 1883
1.3、dashboard - 检查 15672 端口
2、配置mqtt服务
2.1、docker 容器中配置 mqtt 插件
要在 docker 容器中配置 mqtt 插件: 进入 rabbitmq 容器,启用 mqtt 插件,退出后并重新启动 rabbitmq docker服务。
docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_mqtt
# 然后退出容器
exit
重启rabbitmq的docker服务
docker restart rabbitmq
2.2、配置mqtt账号密码
在默认情况下,rabbitmq 安装后是没有账号密码的,也就是说初始情况下,你可以直接连接到 rabbitmq 服务。但是,为了安全起见,你应该设置一个密码来保护 rabbitmq 服务器。
你可以通过以下步骤设置 rabbitmq 的账号密码:(无需重启docker
)
启用 rabbitmq 管理插件: 如果你还没有启用 rabbitmq 的管理插件,你可以使用以下命令启用它:
rabbitmq-plugins enable rabbitmq_management
创建用户: 使用以下命令创建一个新用户(例如,用户名为 “admin”):
rabbitmqctl add_user admin your_password
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
这会创建一个名为 “admin” 的用户,并将密码设置为 “your_password”。记得将 “your_password” 替换为你想要设置的密码
。
授予管理员权限: 使用以下命令为新用户授予管理员权限:
rabbitmqctl set_user_tags admin administrator
设置权限: 最后,你可能需要设置权限,以便用户可以执行所需的操作。例如,你可以使用以下命令授予用户对所有虚拟主机的完全权限:
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
这将为用户 “admin” 授予对所有虚拟主机的所有操作权限。
通过执行这些步骤,你就可以设置一个账号密码来保护 rabbitmq 服务器了。确保密码足够安全,并且只向需要访问 rabbitmq 的用户分享。
3、测试mqtt链接
3.1、golang 链接mqtt demo
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func onmessagereceived(client mqtt.client, message mqtt.message) {
fmt.printf("received message on topic: %s\n", message.topic())
fmt.printf("message: %s\n", message.payload())
}
func main() {
// 创建 mqtt 连接参数
opts := mqtt.newclientoptions().addbroker("tcp://your-address:your-port")
opts.setclientid("go_client")
opts.setusername("your-username") // 设置用户名
opts.setpassword("your-password") // 设置密码
// 创建 mqtt 客户端实例
client := mqtt.newclient(opts)
// 连接到 mqtt 代理
if token := client.connect(); token.wait() && token.error() != nil {
panic(token.error())
}
// 订阅主题
if token := client.subscribe("test/topic", 0, onmessagereceived); token.wait() && token.error() != nil {
panic(token.error())
}
fmt.println("connected to mqtt broker")
// 等待信号
sig := make(chan os.signal, 1)
signal.notify(sig, syscall.sigint, syscall.sigterm)
<-sig
// 断开连接
client.disconnect(250)
fmt.println("disconnected from mqtt broker")
}
发表评论