1、引入依赖
<!--mqtt start--> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-integration</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-mqtt</artifactid> <version>5.4.4</version> </dependency> <!--mqtt end--> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-configuration-processor</artifactid> <optional>true</optional> </dependency>
2、增加yml配置
spring: mqtt: username: test password: test url: tcp://127.0.0.1:8080 subclientid: singo_sub_client_id_888 #订阅 客户端id pubclientid: singo_pub_client_id_888 #发布 客户端id connectiontimeout: 30 keepalive: 60
3、资源配置类
@data @configurationproperties(prefix = "spring.mqtt") public class mqttconfigurationproperties { private string username; private string password; private string url; private string subclientid; private string pubclientid; private int connectiontimeout; private int keepalive; }
注意启动类需要增加注解
@enableconfigurationproperties(mqttconfigurationproperties.class)
4、mqtt配置类
@configuration public class mqttconfig { @autowired private mqttconfigurationproperties mqttconfigurationproperties; /** * 连接参数 * * @return */ @bean public mqttconnectoptions mqttconnectoptions() { mqttconnectoptions options = new mqttconnectoptions(); options.setusername(mqttconfigurationproperties.getusername()); options.setpassword(mqttconfigurationproperties.getpassword().tochararray()); options.setserveruris(new string[]{mqttconfigurationproperties.geturl()}); options.setconnectiontimeout(mqttconfigurationproperties.getconnectiontimeout()); options.setkeepaliveinterval(mqttconfigurationproperties.getkeepalive()); options.setcleansession(true); // 设置为false以便断线重连后恢复会话 options.setautomaticreconnect(true); return options; } /** * 连接工厂 * * @param options * @return */ @bean public mqttpahoclientfactory mqttclientfactory(mqttconnectoptions options) { defaultmqttpahoclientfactory factory = new defaultmqttpahoclientfactory(); factory.setconnectionoptions(options); return factory; } /** * 消息输入通道 * 每次只有一个消息处理器可以消费消息。 * 当前消息的处理完成之前,新消息需要排队等待,无法并行处理。 * 默认是:单线程、顺序执行的 * @return */ // @bean // public directchannel mqttinputchannel() { // return new directchannel(); // } /** * 支持多线程并发处理消息的输入通道 * * @return */ @bean public executorchannel mqttinputchannel() { return new executorchannel(executors.newfixedthreadpool(10)); // 线程池大小可以调整 } /** * 配置入站适配器 * * @param mqttclientfactory * @return */ @bean public mqttpahomessagedrivenchanneladapter messagedrivenchanneladapter(mqttpahoclientfactory mqttclientfactory) { mqttpahomessagedrivenchanneladapter adapter = new mqttpahomessagedrivenchanneladapter(mqttconfigurationproperties.getsubclientid(), mqttclientfactory); // adapter.addtopic("pub/300119110099"); 订阅主题,也可以放在初始化动态配置 adapter.setoutputchannel(mqttinputchannel()); return adapter; } /** * 配置消息处理器 * * @return */ @bean @serviceactivator(inputchannel = "mqttinputchannel") // 指定通道 public messagehandler messagehandler() { return new mqttreceivermessagehandler(); } }
5、消息处理器配置
@slf4j @component public class mqttreceivermessagehandler implements messagehandler { @autowired private mqttmessageprocessingservice mqttmessageprocessingservice; @override public void handlemessage(message<?> message) throws messagingexception { messageheaders headers = message.getheaders(); log.info("线程名称:{},收到消息,主题:{},消息:{}", thread.currentthread().getname(), headers.get("mqtt_receivedtopic").tostring(), message.getpayload()); // log.info("收到消息主题:{}", headers.get("mqtt_receivedtopic").tostring()); // log.info("收到消息:{}", message.getpayload()); // 消息保存到内存队列里面,定时批量入库,也可以在这里直接入库 mqttmessageprocessingservice.addmessage(message.getpayload().tostring()); } }
6、消息主题缓存对象
@component public class mqtttopicstore { private final concurrenthashmap<string, string> topics = new concurrenthashmap<>(); public concurrenthashmap<string, string> gettopics() { return topics; } }
7、动态订阅数据库主题配置
@slf4j @component public class mqttinit { @autowired private mqttpahomessagedrivenchanneladapter messagedrivenchanneladapter; @autowired private mqtttopicstore mqtttopicstore; @postconstruct public void init() { subscribealltopics(); } public void subscribealltopics() { // list<mqtttopicconfig> topics = topicconfigmapper.findallenabled(); // for (mqtttopicconfig topic : topics) { // subscribetopic(topic); // } log.info("===================>从数据库里获取并初始化订阅所有主题"); list<string> topics = listutil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100"); topics.stream().foreach(t -> { messagedrivenchanneladapter.addtopic(t); // 同时往mqtttopicstore.topics中增加一条记录用于缓存 }); } }
8、消息处理服务
@service public class mqttmessageprocessingservice { @autowired private mqttpahomessagedrivenchanneladapter messagedrivenchanneladapter; @autowired private mqtttopicstore mqtttopicstore; // 内存队列,用于暂存消息 private final blockingqueue<string> messagequeue = new linkedblockingqueue<>(); // 添加消息到队列 public void addmessage(string message) { messagequeue.add(message); } /** * 可以放到定时任务里面去,注入后取队列方便维护 * 定时任务,每5秒执行一次 ,建议2分钟一次 理想的触发间隔应略小于数据到达间隔,以确保及时处理和插入 * 如果每 5 分钟收到一条数据,可以设置任务执行周期为4 分钟或更短,以便任务有足够的时间处理数据,同时减少积压的可能性。 */ @scheduled(fixedrate = 1 * 60 * 1000) public void batchinserttodatabase() { system.out.println("定时任务执行中,当前队列大小:" + messagequeue.size()); list<string> batch = new arraylist<>(); messagequeue.drainto(batch, 500); // 一次性取最多500条消息 if (!batch.isempty()) { // 批量插入数据库 savemessagestodatabase(batch); } } private void savemessagestodatabase(list<string> messages) { // 假设这是批量插入逻辑 system.out.println("批量插入数据库,条数:" + messages.size()); for (string message : messages) { system.out.println("插入消息:" + message); } // 实际数据库操作代码 } /** * 订阅与取消订阅定时任务 */ public void subscribeandunsubscribetask() { // 从数据库获取所有主题,正常状态、删除状态 // 正常状态:判断mqtttopicstore.topics中是否存在,不存在则订阅,并在mqtttopicstore.topics中增加 // 删除状态: 判断mqtttopicstore.topics中是否存在,存在则取消订阅,并在mqtttopicstore.topics中删除 // messagedrivenchanneladapter.addtopic(t); } }
以上是简单的对接步骤,部分类、方法可以根据实际情况进行合并处理!!!!
9、定时任务
@slf4j @configuration @enablescheduling public class mqttjob { @value("${schedule.enable}") private boolean enable; @autowired private mqttmessageprocessingservice mqttmessageprocessingservice; /** * 定时订阅与取消订阅主题,从共享主题对象mqtttopicstore里面取出主题列表,然后进行订阅或取消订阅 * 每分钟一次 */ public void subscribeandunsubscribe() { if (!enable) return; mqttmessageprocessingservice.subscribeandunsubscribetask(); } /** * 定时处理队列里面的订阅消息,会有丢失风险,宕机时会丢失队列里面的消息 * 每分钟一次 要考虑一次消息处理的时间;也可先不使用队列,每次收到消息直接实时入库,有性能问题时在启用 */ public void batchsavesubscribemessage() { } }
到此这篇关于springboot 集成mqtt实现消息订阅的文章就介绍到这了,更多相关springboot mqtt消息订阅内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论