使用版本
"mqtt": "^5.8.0",
安装指令
npm install mqtt --save ------ yarn add mqtt
配置
connection: {
protocol: "ws",
host: "broker.emqx.io",
port: 8083,
endpoint: "/mqtt",
clean: true,
connecttimeout: 30 * 1000, // ms
reconnectperiod: 4000, // ms
clientid: "emqx_vue_" + math.random().tostring(16).substring(2, 8),
// 随机数 每次不能重复
username: "emqx_test",
password: "emqx_test",
},
连接
import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)
client.on('connect', (e) => {
// 订阅主题
})
订阅主题
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
消息发布
给后端发送格式,是和后端约定好的数据格式,一般为json传输。
client.publish(publishtopic, `{"messagetype":1,"messagecontent":""}`, { qos: 0 }, (err) => {
if (!err) {
console.log('发送成功')
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
} else {
console.log('消息发送失败!')
}
})
取消订阅
client.unsubscribe(topiclist, (error) => {
console.log('主题为' + topiclist + '取消订阅成功', error)
})
断开连接
export function unconnect() {
client.end()
client = null
// message.warning('服务器已断开连接!')
console.log('服务器已断开连接!')
}
mqtt封装使用(ts版)
import type { iclientoptions, mqttclient } from 'mqtt';
import mqtt from 'mqtt';
interface clientoptions extends iclientoptions {
clientid: string;
}
interface subscribeoptions {
topic: string;
callback: (topic: string, message: string) => void;
subscribeoption?: mqtt.iclientsubscribeoptions;
}
interface publishoptions {
topic: string;
message: string;
}
class mqtt {
private static instance: mqtt;
private client: mqttclient | undefined;
private subscribemembers: record<string, ((topic: string, message: string) => void) | undefined> = {};
private pendingsubscriptions: subscribeoptions[] = [];
private pendingpublications: publishoptions[] = [];
private isconnected: boolean = false;
private constructor(url?: string) {
if (url) {
this.connect(url);
}
}
public static getinstance(url?: string): mqtt {
if (!mqtt.instance) {
mqtt.instance = new mqtt(url);
} else if (url && !mqtt.instance.client) {
mqtt.instance.connect(url);
}
return mqtt.instance;
}
private connect(url: string): void {
console.log(url, clientoptions);
if (!this.client) {
this.client = mqtt.connect(url, clientoptions);
this.client.on('connect', this.onconnect);
this.client.on('reconnect', this.onreconnect);
this.client.on('error', this.onerror);
this.client.on('message', this.onmessage);
}
}
public disconnect(): void {
if (this.client) {
this.client.end();
this.client = undefined;
this.subscribemembers = {};
this.isconnected = false;
console.log(`服务器已断开连接!`);
}
}
public subscribe({ topic, callback }: subscribeoptions): void {
if (this.isconnected) {
this.client?.subscribe(topic, { qos: 1 }, error => {
if (error) {
console.log(`客户端: ${clientoptions.clientid}, 订阅主题: ${topic}失败: `, error);
} else {
console.log(`客户端: ${clientoptions.clientid}, 订阅主题: ${topic}成功`);
}
});
this.subscribemembers[topic] = callback;
} else {
this.pendingsubscriptions.push({ topic, callback });
}
}
public unsubscribe(topic: string): void {
if (!this.client) {
return;
}
this.client.unsubscribe(topic, error => {
if (error) {
console.log(`客户端: ${clientoptions.clientid}, 取消订阅主题: ${topic}失败: `, error);
} else {
console.log(`客户端: ${clientoptions.clientid}, 取消订阅主题: ${topic}成功`);
}
});
this.subscribemembers[topic] = undefined;
}
public publish({ topic, message }: publishoptions): void {
if (this.isconnected) {
this.client?.publish(topic, message, { qos: 1 }, e => {
if (e) {
console.log(`客户端: ${clientoptions.clientid}, 发送主题为: ${topic} 的消息, 发送失败: `, e);
}
});
} else {
this.pendingpublications.push({ topic, message });
}
}
private onconnect = (e: any): void => {
console.log(`客户端: ${clientoptions.clientid}, 连接服务器成功:`, e);
this.isconnected = true;
this.processpendingsubscriptions();
this.processpendingpublications();
};
private onreconnect = (): void => {
console.log(`客户端: ${clientoptions.clientid}, 正在重连:`);
this.isconnected = false;
};
private onerror = (error: error): void => {
console.log(`客户端: ${clientoptions.clientid}, 连接失败:`, error);
this.isconnected = false;
};
private onmessage = (topic: string, message: buffer): void => {
// console.log(
// `客户端: ${clientoptions.clientid}, 接收到来自主题: ${topic} 的消息: `,
// message.tostring(),
// );
const callback = this.subscribemembers?.[topic];
callback?.(topic, message.tostring());
};
private processpendingsubscriptions(): void {
while (this.pendingsubscriptions.length > 0) {
const { topic, callback, subscribeoption } = this.pendingsubscriptions.shift()!;
this.subscribe({ topic, callback, subscribeoption });
}
}
private processpendingpublications(): void {
while (this.pendingpublications.length > 0) {
const { topic, message } = this.pendingpublications.shift()!;
this.publish({ topic, message });
}
}
}
const clientoptions: clientoptions = {
clean: true,
connecttimeout: 500,
protocolversion: 5,
rejectunauthorized: false,
username: 'admin',
password: 'anjian-emqx',
clientid: `client-${date.now()}`
};
// export default mqtt.getinstance("ws://192.168.11.14:8083/mqtt");
// export default mqtt.getinstance("ws://192.168.11.14:8083/mqtt");
// export default mqtt.getinstance(json.parse(import.meta.env.vite_other_service_base_url).mqtt);
const { protocol, host } = window.location;
export default mqtt.getinstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);
注意:
1.环境配置
.env.test
vite_other_service_base_url= `{
"mqtt": "ws://192.168.11.14:8083/mqtt"
}`
2.qos设置 前后端统一为1

以上就是vue3中如何使用mqtt数据传输的详细内容,更多关于vue3 mqtt数据传输的资料请关注代码网其它相关文章!
发表评论