在spring boot中对接mqtt协议,可以使用eclipse paho客户端和spring integration mqtt模块。以下是详细实现步骤:
1. 添加依赖
<dependencies>
<!-- spring boot starter -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter</artifactid>
</dependency>
<!-- spring integration mqtt -->
<dependency>
<groupid>org.springframework.integration</groupid>
<artifactid>spring-integration-mqtt</artifactid>
</dependency>
<!-- eclipse paho mqtt client -->
<dependency>
<groupid>org.eclipse.paho</groupid>
<artifactid>org.eclipse.paho.client.mqttv3</artifactid>
<version>1.2.5</version>
</dependency>
</dependencies>
2. 配置mqtt连接参数
# application.yml mqtt: broker-url: tcp://localhost:1883 username: admin password: password client-id: spring-boot-client default-topic: test/topic timeout: 30 keepalive: 60 completion-timeout: 30000
3. mqtt配置类
@configuration
@enableconfigurationproperties(mqttproperties.class)
public class mqttconfig {
@autowired
private mqttproperties mqttproperties;
// mqtt连接配置
@bean
public mqttconnectoptions mqttconnectoptions() {
mqttconnectoptions options = new mqttconnectoptions();
options.setserveruris(new string[]{mqttproperties.getbrokerurl()});
options.setusername(mqttproperties.getusername());
options.setpassword(mqttproperties.getpassword().tochararray());
options.setconnectiontimeout(mqttproperties.gettimeout());
options.setkeepaliveinterval(mqttproperties.getkeepalive());
options.setautomaticreconnect(true);
options.setcleansession(true);
return options;
}
// mqtt客户端工厂
@bean
public mqttpahoclientfactory mqttclientfactory() {
defaultmqttpahoclientfactory factory = new defaultmqttpahoclientfactory();
factory.setconnectionoptions(mqttconnectoptions());
return factory;
}
// 出站消息通道(用于发送消息)
@bean
@serviceactivator(inputchannel = "mqttoutboundchannel")
public messagehandler mqttoutbound() {
mqttpahomessagehandler messagehandler =
new mqttpahomessagehandler(mqttproperties.getclientid() + "-producer", mqttclientfactory());
messagehandler.setasync(true);
messagehandler.setdefaulttopic(mqttproperties.getdefaulttopic());
return messagehandler;
}
// 出站通道
@bean
public messagechannel mqttoutboundchannel() {
return new directchannel();
}
// 入站消息适配器(用于接收消息)
@bean
public messageproducer inbound() {
mqttpahomessagedrivenchanneladapter adapter =
new mqttpahomessagedrivenchanneladapter(mqttproperties.getclientid() + "-consumer",
mqttclientfactory(), mqttproperties.getdefaulttopic());
adapter.setcompletiontimeout(mqttproperties.getcompletiontimeout());
adapter.setconverter(new defaultpahomessageconverter());
adapter.setqos(1);
adapter.setoutputchannel(mqttinboundchannel());
return adapter;
}
// 入站通道
@bean
public messagechannel mqttinboundchannel() {
return new directchannel();
}
// 入站消息处理器
@bean
@serviceactivator(inputchannel = "mqttinboundchannel")
public messagehandler handler() {
return new messagehandler() {
@override
public void handlemessage(message<?> message) throws messagingexception {
string topic = (string) message.getheaders().get("mqtt_receivedtopic");
string payload = (string) message.getpayload();
system.out.println("received message from topic: " + topic + ", payload: " + payload);
// 处理接收到的消息
processmessage(topic, payload);
}
};
}
}
4. 配置属性类
@configurationproperties(prefix = "mqtt")
@component
@data
public class mqttproperties {
private string brokerurl;
private string username;
private string password;
private string clientid;
private string defaulttopic;
private int timeout;
private int keepalive;
private int completiontimeout;
}
5. mqtt服务类
@service
public class mqttservice {
@autowired
private messagechannel mqttoutboundchannel;
// 发送消息到指定主题
public void sendmessage(string topic, string message) {
mqttoutboundchannel.send(messagebuilder.withpayload(message)
.setheader("mqtt_topic", topic)
.build());
}
// 发送消息到默认主题
public void sendmessage(string message) {
mqttoutboundchannel.send(messagebuilder.withpayload(message).build());
}
// 发送带qos的消息
public void sendmessage(string topic, string message, int qos) {
mqttoutboundchannel.send(messagebuilder.withpayload(message)
.setheader("mqtt_topic", topic)
.setheader("mqtt_qos", qos)
.build());
}
}
6. 消息处理器
@component
public class mqttmessageprocessor {
private static final logger logger = loggerfactory.getlogger(mqttmessageprocessor.class);
public void processmessage(string topic, string payload) {
logger.info("processing mqtt message - topic: {}, payload: {}", topic, payload);
// 根据不同的主题进行不同的处理
switch (topic) {
case "test/topic":
handletesttopic(payload);
break;
case "sensor/data":
handlesensordata(payload);
break;
default:
handledefaultmessage(topic, payload);
}
}
private void handletesttopic(string payload) {
logger.info("处理测试主题消息: {}", payload);
// 具体的业务逻辑
}
private void handlesensordata(string payload) {
logger.info("处理传感器数据: {}", payload);
try {
// 解析json数据等操作
// objectmapper mapper = new objectmapper();
// sensordata data = mapper.readvalue(payload, sensordata.class);
} catch (exception e) {
logger.error("解析传感器数据失败", e);
}
}
private void handledefaultmessage(string topic, string payload) {
logger.info("处理默认消息 - topic: {}, payload: {}", topic, payload);
}
}
7. 控制器示例
@restcontroller
@requestmapping("/mqtt")
public class mqttcontroller {
@autowired
private mqttservice mqttservice;
@postmapping("/publish")
public responseentity<string> publishmessage(@requestparam string topic,
@requestparam string message) {
try {
mqttservice.sendmessage(topic, message);
return responseentity.ok("message published successfully");
} catch (exception e) {
return responseentity.status(500).body("failed to publish message: " + e.getmessage());
}
}
@postmapping("/publish/default")
public responseentity<string> publishtodefaulttopic(@requestparam string message) {
try {
mqttservice.sendmessage(message);
return responseentity.ok("message published to default topic");
} catch (exception e) {
return responseentity.status(500).body("failed to publish message: " + e.getmessage());
}
}
}
8. 主应用类
@springbootapplication
@enableconfigurationproperties
public class mqttapplication {
public static void main(string[] args) {
springapplication.run(mqttapplication.class, args);
}
}
9. 测试mqtt服务
可以使用mqtt.fx或其他mqtt客户端工具进行测试:
- 启动spring boot应用
- 使用mqtt客户端订阅主题
test/topic - 调用api发送消息:
curl -x post "http://localhost:8080/mqtt/publish?topic=test/topic&message=hello mqtt"
主要特性
- 自动重连: 配置了自动重连机制
- qos支持: 支持不同的服务质量等级
- 多主题订阅: 可以订阅多个主题
- 异步处理: 消息发送支持异步模式
- 配置灵活: 通过配置文件管理连接参数
这样你就实现了一个完整的spring boot mqtt集成方案,可以方便地进行消息的发布和订阅。
到此这篇关于springboot对接mqtt的项目实践的文章就介绍到这了,更多相关springboot对接mqtt内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论