一:前提
安装了emqx开源版、mqttx客户端
二:订阅发布实现步骤
1.引入依赖
<!--mqtt客户端-->
<dependency>
<groupid>org.eclipse.paho</groupid>
<artifactid>org.eclipse.paho.client.mqttv3</artifactid>
<version>1.2.2</version>
</dependency>2.编辑配置文件
mqtt:
broker:
uri: tcp://127.0.0.1:31883
client:
id: mqtt-am-client-${random.uuid}
# 订阅主题配置(支持多个)
intopics:
- topic: test/topic1
qos: 0
- topic: test/topic2
qos: 1
- topic: test/topic3
qos: 2
# 发布主题配置(支持多个)
outtopics:
- topic: out/topic1
qos: 0
username: am
password: lgyptuab4th5p
keepaliveinterval: 603.读取配置文件
package com.wtzn.web.config;
import lombok.data;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.configuration;
import java.util.list;
@configuration
@configurationproperties(prefix = "mqtt")
@data
public class mqttproperties {
private broker broker;
private client client;
private list<topicconfig> intopics;
private list<topicconfig> outtopics;
private string username;
private string password;
private int keepaliveinterval;
@data
public static class broker {
private string uri;
}
@data
public static class client {
private string id;
}
@data
public static class topicconfig {
private string topic;
private int qos;
}
}4.创建mqtt客户端
package com.wtzn.web.config;
import org.eclipse.paho.client.mqttv3.mqttclient;
import org.eclipse.paho.client.mqttv3.mqttconnectoptions;
import org.eclipse.paho.client.mqttv3.mqttexception;
import org.eclipse.paho.client.mqttv3.persist.memorypersistence;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
@configuration
public class mqttconfig {
@autowired
private mqttproperties mqttproperties;
@bean
public mqttclient mqttclient() throws mqttexception {
mqttclient client = new mqttclient(mqttproperties.getbroker().geturi(), mqttproperties.getclient().getid(), new memorypersistence());
mqttconnectoptions options = new mqttconnectoptions();
// 此客户端的用户名和密码
options.setusername(mqttproperties.getusername());
options.setpassword(mqttproperties.getpassword().tochararray());
options.setcleansession(true);
// 设置遗嘱消息
// options.setwill(mqttproperties.getouttopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getbytes(), 2, true);
// 连接超时重试
options.setconnectiontimeout(5000); //毫秒
options.setkeepaliveinterval(mqttproperties.getkeepaliveinterval());
options.setautomaticreconnect(true);//网络中断重连
client.connect(options);
return client;
}
}5.controller层
package com.wtzn.web.controller;
import cn.dev33.satoken.annotation.saignore;
import com.wtzn.common.json.utils.jsonutils;
import com.wtzn.web.domain.bo.payload;
import com.wtzn.web.service.mqttservice;
import lombok.extern.slf4j.slf4j;
import org.eclipse.paho.client.mqttv3.mqttexception;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.*;
import java.util.linkedlist;
@restcontroller
@slf4j
@requestmapping("/mqtt")
public class mqttcontroller {
@autowired
private mqttservice mqttservice;
@saignore
@postmapping("/mqtt")
public void publish() {
try {
// linkedlist<payload> payloadlinkedlist=new linkedlist<>();
for(int i=1; i<=10000; i++){
payload payload=new payload();
payload.settemperature(i);
// payloadlinkedlist.add(payload);
mqttservice.publish("test/topic1",0,jsonutils.tojsonstring(payload));
}
} catch (mqttexception e) {
log.error("发布消息失败{}", e.getmessage());
}
log.info("发布消息成功");
}
}6.service层
package com.wtzn.web.service;
import com.wtzn.common.json.utils.jsonutils;
import com.wtzn.web.config.mqttproperties;
import com.wtzn.web.domain.bo.payload;
import jakarta.annotation.postconstruct;
import jakarta.annotation.predestroy;
import lombok.sneakythrows;
import lombok.extern.slf4j.slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
import java.util.arrays;
@service
@slf4j
public class mqttservice implements mqttcallbackextended {
@autowired
private mqttclient mqttclient;
@autowired
private mqttproperties mqttproperties;
@postconstruct
public void init() throws mqttexception {
mqttclient.setcallback(this);
/* mqttclient.subscribe(mqttproperties.getintopic());
log.info("订阅主题{}", mqttproperties.getintopic());
*/
mqttproperties.getintopics().foreach(x -> {
try {
mqttclient.subscribe(x.gettopic(), x.getqos());
log.info("订阅主题{}", x.gettopic());
} catch (mqttexception e) {
throw new runtimeexception(e);
}
});
}
@predestroy
public void destroy() throws mqttexception {
mqttclient.disconnect();
log.info("与服务器断开连接");
}
/**
* @description: 发送消息
* @param: [message]
* @return: void
**/
public void publish(string topic,int qos,string message) throws mqttexception {
mqttmessage mqttmessage = new mqttmessage(message.getbytes());
mqttmessage.setqos(qos);
mqttclient.publish(topic, mqttmessage);
log.info("向主题【{}】发布消息:【{}】", topic, message);
}
/**
* @description: 接收消息
* @param: [topic, message]
* @return: void
**/
@override
public void messagearrived(string topic, mqttmessage message) throws mqttexception {
payload payload = jsonutils.parseobject(new string(message.getpayload()), payload.class);
log.info("接收到来自【{}】的消息【{}】", topic, payload.gettemperature());
/* if (payload.gettemperature() > 37) {
publish("发烧");
}*/
}
@override
public void connectionlost(throwable cause) {
log.error("连接丢失:{}", cause.getmessage());
}
@sneakythrows
@override
public void deliverycomplete(imqttdeliverytoken token) {
if( token!=null ){
mqttmessage message = null;
try {
message = token.getmessage();
} catch (mqttexception e) {
throw new runtimeexception(e);
}
string topic = token.gettopics()==null ? null : arrays.aslist(token.gettopics()).tostring();
string str = message==null ? null : new string(message.getpayload());
log.info("deliverycomplete: topic={}, message={}", topic, str);
} else {
log.info("deliverycomplete: null");
}
log.info("消息已送达");
}
@override
public void connectcomplete(boolean b, string s) {
mqttproperties.getintopics().foreach(x -> {
try {
mqttclient.subscribe(x.gettopic(), x.getqos());
log.info("订阅主题{}", x.gettopic());
} catch (mqttexception e) {
throw new runtimeexception(e);
}
});
}
}7.dao层
package com.wtzn.web.domain.bo;
import lombok.data;
@data
public class payload {
private integer temperature;
}三:测试
1.postman直接调用测试



2、下载mqttx客户端进行测试



总结
到此这篇关于java连接emqx实现订阅发布消息的文章就介绍到这了,更多相关java emqx订阅发布消息内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论