mqtt-springboot
创建简单 springboot 项目
导入必须依赖
pom.xml
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>com.study</groupid> <artifactid>mqttdemo</artifactid> <version>0.0.1-snapshot</version> <name>springbootmqttdemo</name> <description>springbootmqttdemo</description> <properties> <java.version>1.8</java.version> <project.build.sourceencoding>utf-8</project.build.sourceencoding> <project.reporting.outputencoding>utf-8</project.reporting.outputencoding> <spring-boot.version>2.6.13</spring-boot.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <!-- spring boot项目web开发的起步依赖 --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <!-- spring boot项目集成消息中间件基础依赖 --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-integration</artifactid> </dependency> <!-- spring boot项目和mqtt客户端集成起步依赖 --> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-mqtt</artifactid> <version>5.4.3</version> </dependency> <!-- lombok依赖 --> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> </dependency> <!-- fastjson依赖 --> <dependency> <groupid>com.alibaba</groupid> <artifactid>fastjson</artifactid> <version>1.2.83</version> </dependency> </dependencies> <dependencymanagement> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-dependencies</artifactid> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencymanagement> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> <version>${spring-boot.version}</version> <configuration> <mainclass>com.study.mqtt.demo.mqttdemoapplication</mainclass> <skip>true</skip> </configuration> <executions> <execution> <id>repackage</id> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
增加mqtt相关配置
application.yml
spring: mqtt: # mqtt 服务器地址 url: tcp://192.168.40.128:1883 # 订阅客户端id subclientid: sub_client_id_1 # 订阅主题 subtopic: lq/iot/demo/ # 发布客户端id pubclientid: pub_client_id_1 # 用户名 username: admin # 密码 password: admin123456
编写对应java类
配置类
mqttconfig.java
package com.study.mqtt.demo.domain; import lombok.data; import org.springframework.boot.context.properties.configurationproperties; @data @configurationproperties(prefix = "spring.mqtt") public class mqttconfig { private string username; private string password; private string url; private string subclientid ; private string subtopic ; private string pubclientid ; }
启动类增加开启配置
mqttdemoapplication.java
package com.study.mqtt.demo; import com.study.mqtt.demo.domain.mqttconfig; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.boot.context.properties.enableconfigurationproperties; @springbootapplication @enableconfigurationproperties(value = mqttconfig.class) public class mqttdemoapplication { public static void main(string[] args) { springapplication.run(mqttdemoapplication.class, args); } }
创建mqtt连接工厂类
mqttfactory.java
package com.study.mqtt.demo.factory; import com.study.mqtt.demo.domain.mqttconfig; import org.eclipse.paho.client.mqttv3.mqttconnectoptions; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.integration.mqtt.core.defaultmqttpahoclientfactory; import org.springframework.integration.mqtt.core.mqttpahoclientfactory; @configuration public class mqttfactory { @autowired private mqttconfig mqttconfig; @bean public mqttpahoclientfactory mqttclientfactory() { // 创建客户端工厂 defaultmqttpahoclientfactory factory = new defaultmqttpahoclientfactory(); mqttconnectoptions options = new mqttconnectoptions(); options.setusername(mqttconfig.getusername()); options.setpassword(mqttconfig.getpassword().tochararray()); options.setserveruris(new string[]{mqttconfig.geturl()}); options.setcleansession(true); factory.setconnectionoptions(options); return factory; } }
接收消息处理类
receivemsghandler.java
package com.study.mqtt.demo.handler; import org.springframework.messaging.message; import org.springframework.messaging.messagehandler; import org.springframework.messaging.messageheaders; import org.springframework.messaging.messagingexception; import org.springframework.stereotype.component; @component public class receivemsghandler implements messagehandler { @override public void handlemessage(message<?> message) throws messagingexception { system.out.println("接收到消息对象:" + message); // 消息内容 object payload = message.getpayload(); messageheaders headers = message.getheaders(); object mqttreceivedtopic = headers.get("mqtt_receivedtopic"); system.out.println("接收的消息主题:" + mqttreceivedtopic); system.out.println("接收的消息内容:" + payload); } }
接收消息配置类
mqttinboundconfig.java
package com.study.mqtt.demo.inbound; import com.study.mqtt.demo.domain.mqttconfig; import com.study.mqtt.demo.factory.mqttfactory; import com.study.mqtt.demo.handler.receivemsghandler; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.integration.annotation.serviceactivator; import org.springframework.integration.channel.directchannel; import org.springframework.integration.core.messageproducer; import org.springframework.integration.mqtt.core.mqttpahoclientfactory; import org.springframework.integration.mqtt.inbound.mqttpahomessagedrivenchanneladapter; import org.springframework.integration.mqtt.support.defaultpahomessageconverter; import org.springframework.messaging.messagechannel; import org.springframework.messaging.messagehandler; @configuration public class mqttinboundconfig { @autowired private mqttconfig mqttconfig ; @autowired private receivemsghandler receivemsghandler; /** * 配置消息接收通道 * @return */ @bean public messagechannel mqttinputchannel() { return new directchannel(); } /** * 配置接收适配器 */ @bean public messageproducer messageproducer(mqttpahoclientfactory mqttpahoclientfactory) { mqttpahomessagedrivenchanneladapter adapter = new mqttpahomessagedrivenchanneladapter(mqttconfig.geturl() , mqttconfig.getsubclientid() , mqttpahoclientfactory , mqttconfig.getsubtopic().split(",")) ; adapter.setconverter(new defaultpahomessageconverter()); // 质量服务等级 adapter.setqos(1); adapter.setoutputchannel(mqttinputchannel()); return adapter ; } /** * 配置接收消息处理器 * @return */ @bean @serviceactivator(inputchannel = "mqttinputchannel") // 指定处理消息使用得通道 public messagehandler messagehandler() { return this.receivemsghandler ; } }
发送消息配置类
mqttoutboundconfig.java
package com.study.mqtt.demo.outbound; import com.study.mqtt.demo.domain.mqttconfig; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.integration.annotation.serviceactivator; import org.springframework.integration.channel.directchannel; import org.springframework.integration.mqtt.core.mqttpahoclientfactory; import org.springframework.integration.mqtt.outbound.mqttpahomessagehandler; import org.springframework.messaging.messagechannel; import org.springframework.messaging.messagehandler; @configuration public class mqttoutboundconfig { @autowired private mqttconfig mqttconfig; @autowired private mqttpahoclientfactory pahoclientfactory ; @bean public messagechannel mqttoutputchannel() { return new directchannel(); } @bean @serviceactivator(inputchannel = "mqttoutputchannel") public messagehandler mqttoutboundmassagehandler() { mqttpahomessagehandler messagehandler = new mqttpahomessagehandler(mqttconfig.geturl() , mqttconfig.getpubclientid() , pahoclientfactory ) ; messagehandler.setasync(true); messagehandler.setdefaultqos(0); messagehandler.setdefaulttopic("default"); return messagehandler ; } }
发送消息网关接口类
mqttgateway.java
package com.study.mqtt.demo.gateway; import org.springframework.integration.annotation.messaginggateway; import org.springframework.integration.mqtt.support.mqttheaders; import org.springframework.messaging.handler.annotation.header; import org.springframework.stereotype.component; @messaginggateway(defaultrequestchannel = "mqttoutputchannel") public interface mqttgateway { /** * 发送mqtt消息 * @param topic 主题 * @param payload 内容 */ void sendtomqtt(@header(mqttheaders.topic) string topic, string payload); /** * 发送包含qos的消息 * @param topic 主题 * @param qos 对消息处理的几种机制。 * * 0 发送成功就算完成,会出现消息丢失 * * 1 增加消息重试机制,消息发送失败会重新发送,会出现重复消息 * * 2 多了一次去重的动作,确保只有一次消息推给订阅者。 * @param payload 消息体 */ void sendtomqtt(@header(mqttheaders.topic) string topic, @header(mqttheaders.qos) int qos, string payload); }
发送消息服务类
mqttmsgsenderservice.java
package com.study.mqtt.demo.service; import com.study.mqtt.demo.gateway.mqttgateway; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; @service public class mqttmsgsenderservice { @autowired private mqttgateway mqttgateway; public void send(string topic, string payload) { mqttgateway.sendtomqtt(topic, payload); } public void send(string topic, int qos, string payload) { mqttgateway.sendtomqtt(topic, qos, payload); } }
测试验证
订阅消息验证
启动项目
发送消息
- 主题为配置文件中配置的订阅主题
lq/iot/demo/
- 发送时间:
2025-05-25 21:29:26:439
订阅收到消息
- 接收到消息的时间:
sun may 25 21:29:26 gmt+08:00 2025
- 接收到的主题:
lq/iot/demo/
- 接收到的内容:
{ "msg":"spring boot mqtt demo" }
发送消息验证
- 编写测试类
- 发送主题:
sb/mqtt/test
- 发送内容:
hello world !=> 当前时间
- 发送主题:
package com.study.mqtt.demo; import com.study.mqtt.demo.service.mqttmsgsenderservice; import org.junit.jupiter.api.test; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.test.context.springboottest; import java.util.date; @springboottest(classes = mqttdemoapplication.class) class mqttdemoapplicationtests { @autowired private mqttmsgsenderservice mqttmsgsenderservice; @test void contextloads() { } @test void sendmsg(){ mqttmsgsenderservice.send("sb/mqtt/test", "hello world ! => " + new date()); } }
创建订阅者
订阅主题: sb/mqtt/test
运行测试类
订阅者接收消息
主题:sb/mqtt/test
到此这篇关于mqtt springboot整合的文章就介绍到这了,更多相关mqtt springboot整合内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论