一:前提
安装了emqx开源版、mqttx客户端
二:订阅发布实现步骤
1.引入依赖
<!--mqtt客户端--> <dependency> <groupid>org.eclipse.paho</groupid> <artifactid>org.eclipse.paho.client.mqttv3</artifactid> <version>1.2.2</version> </dependency>
2.编辑配置文件
mqtt: broker: uri: tcp://127.0.0.1:31883 client: id: mqtt-am-client-${random.uuid} # 订阅主题配置(支持多个) intopics: - topic: test/topic1 qos: 0 - topic: test/topic2 qos: 1 - topic: test/topic3 qos: 2 # 发布主题配置(支持多个) outtopics: - topic: out/topic1 qos: 0 username: am password: lgyptuab4th5p keepaliveinterval: 60
3.读取配置文件
package com.wtzn.web.config; import lombok.data; import org.springframework.boot.context.properties.configurationproperties; import org.springframework.context.annotation.configuration; import java.util.list; @configuration @configurationproperties(prefix = "mqtt") @data public class mqttproperties { private broker broker; private client client; private list<topicconfig> intopics; private list<topicconfig> outtopics; private string username; private string password; private int keepaliveinterval; @data public static class broker { private string uri; } @data public static class client { private string id; } @data public static class topicconfig { private string topic; private int qos; } }
4.创建mqtt客户端
package com.wtzn.web.config; import org.eclipse.paho.client.mqttv3.mqttclient; import org.eclipse.paho.client.mqttv3.mqttconnectoptions; import org.eclipse.paho.client.mqttv3.mqttexception; import org.eclipse.paho.client.mqttv3.persist.memorypersistence; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class mqttconfig { @autowired private mqttproperties mqttproperties; @bean public mqttclient mqttclient() throws mqttexception { mqttclient client = new mqttclient(mqttproperties.getbroker().geturi(), mqttproperties.getclient().getid(), new memorypersistence()); mqttconnectoptions options = new mqttconnectoptions(); // 此客户端的用户名和密码 options.setusername(mqttproperties.getusername()); options.setpassword(mqttproperties.getpassword().tochararray()); options.setcleansession(true); // 设置遗嘱消息 // options.setwill(mqttproperties.getouttopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getbytes(), 2, true); // 连接超时重试 options.setconnectiontimeout(5000); //毫秒 options.setkeepaliveinterval(mqttproperties.getkeepaliveinterval()); options.setautomaticreconnect(true);//网络中断重连 client.connect(options); return client; } }
5.controller层
package com.wtzn.web.controller; import cn.dev33.satoken.annotation.saignore; import com.wtzn.common.json.utils.jsonutils; import com.wtzn.web.domain.bo.payload; import com.wtzn.web.service.mqttservice; import lombok.extern.slf4j.slf4j; import org.eclipse.paho.client.mqttv3.mqttexception; import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.*; import java.util.linkedlist; @restcontroller @slf4j @requestmapping("/mqtt") public class mqttcontroller { @autowired private mqttservice mqttservice; @saignore @postmapping("/mqtt") public void publish() { try { // linkedlist<payload> payloadlinkedlist=new linkedlist<>(); for(int i=1; i<=10000; i++){ payload payload=new payload(); payload.settemperature(i); // payloadlinkedlist.add(payload); mqttservice.publish("test/topic1",0,jsonutils.tojsonstring(payload)); } } catch (mqttexception e) { log.error("发布消息失败{}", e.getmessage()); } log.info("发布消息成功"); } }
6.service层
package com.wtzn.web.service; import com.wtzn.common.json.utils.jsonutils; import com.wtzn.web.config.mqttproperties; import com.wtzn.web.domain.bo.payload; import jakarta.annotation.postconstruct; import jakarta.annotation.predestroy; import lombok.sneakythrows; import lombok.extern.slf4j.slf4j; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.service; import java.util.arrays; @service @slf4j public class mqttservice implements mqttcallbackextended { @autowired private mqttclient mqttclient; @autowired private mqttproperties mqttproperties; @postconstruct public void init() throws mqttexception { mqttclient.setcallback(this); /* mqttclient.subscribe(mqttproperties.getintopic()); log.info("订阅主题{}", mqttproperties.getintopic()); */ mqttproperties.getintopics().foreach(x -> { try { mqttclient.subscribe(x.gettopic(), x.getqos()); log.info("订阅主题{}", x.gettopic()); } catch (mqttexception e) { throw new runtimeexception(e); } }); } @predestroy public void destroy() throws mqttexception { mqttclient.disconnect(); log.info("与服务器断开连接"); } /** * @description: 发送消息 * @param: [message] * @return: void **/ public void publish(string topic,int qos,string message) throws mqttexception { mqttmessage mqttmessage = new mqttmessage(message.getbytes()); mqttmessage.setqos(qos); mqttclient.publish(topic, mqttmessage); log.info("向主题【{}】发布消息:【{}】", topic, message); } /** * @description: 接收消息 * @param: [topic, message] * @return: void **/ @override public void messagearrived(string topic, mqttmessage message) throws mqttexception { payload payload = jsonutils.parseobject(new string(message.getpayload()), payload.class); log.info("接收到来自【{}】的消息【{}】", topic, payload.gettemperature()); /* if (payload.gettemperature() > 37) { publish("发烧"); }*/ } @override public void connectionlost(throwable cause) { log.error("连接丢失:{}", cause.getmessage()); } @sneakythrows @override public void deliverycomplete(imqttdeliverytoken token) { if( token!=null ){ mqttmessage message = null; try { message = token.getmessage(); } catch (mqttexception e) { throw new runtimeexception(e); } string topic = token.gettopics()==null ? null : arrays.aslist(token.gettopics()).tostring(); string str = message==null ? null : new string(message.getpayload()); log.info("deliverycomplete: topic={}, message={}", topic, str); } else { log.info("deliverycomplete: null"); } log.info("消息已送达"); } @override public void connectcomplete(boolean b, string s) { mqttproperties.getintopics().foreach(x -> { try { mqttclient.subscribe(x.gettopic(), x.getqos()); log.info("订阅主题{}", x.gettopic()); } catch (mqttexception e) { throw new runtimeexception(e); } }); } }
7.dao层
package com.wtzn.web.domain.bo; import lombok.data; @data public class payload { private integer temperature; }
三:测试
1.postman直接调用测试
2、下载mqttx客户端进行测试
总结
到此这篇关于java连接emqx实现订阅发布消息的文章就介绍到这了,更多相关java emqx订阅发布消息内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论