使用版本
"mqtt": "^5.8.0",
安装指令
npm install mqtt --save ------ yarn add mqtt
配置
connection: { protocol: "ws", host: "broker.emqx.io", port: 8083, endpoint: "/mqtt", clean: true, connecttimeout: 30 * 1000, // ms reconnectperiod: 4000, // ms clientid: "emqx_vue_" + math.random().tostring(16).substring(2, 8), // 随机数 每次不能重复 username: "emqx_test", password: "emqx_test", },
连接
import mqtt from "mqtt"; let client = {} client = mqtt.connect(url, options) client.on('connect', (e) => { // 订阅主题 })
订阅主题
client.subscribe(topic, { qos: 1 }, (err) => { if (!err) { console.log('订阅成功') } else { console.log('消息订阅失败!') } })
消息发布
给后端发送格式,是和后端约定好的数据格式,一般为json传输。
client.publish(publishtopic, `{"messagetype":1,"messagecontent":""}`, { qos: 0 }, (err) => { if (!err) { console.log('发送成功') client.subscribe(topic, { qos: 1 }, (err) => { if (!err) { console.log('订阅成功') } else { console.log('消息订阅失败!') } }) } else { console.log('消息发送失败!') } })
取消订阅
client.unsubscribe(topiclist, (error) => { console.log('主题为' + topiclist + '取消订阅成功', error) })
断开连接
export function unconnect() { client.end() client = null // message.warning('服务器已断开连接!') console.log('服务器已断开连接!') }
mqtt封装使用(ts版)
import type { iclientoptions, mqttclient } from 'mqtt'; import mqtt from 'mqtt'; interface clientoptions extends iclientoptions { clientid: string; } interface subscribeoptions { topic: string; callback: (topic: string, message: string) => void; subscribeoption?: mqtt.iclientsubscribeoptions; } interface publishoptions { topic: string; message: string; } class mqtt { private static instance: mqtt; private client: mqttclient | undefined; private subscribemembers: record<string, ((topic: string, message: string) => void) | undefined> = {}; private pendingsubscriptions: subscribeoptions[] = []; private pendingpublications: publishoptions[] = []; private isconnected: boolean = false; private constructor(url?: string) { if (url) { this.connect(url); } } public static getinstance(url?: string): mqtt { if (!mqtt.instance) { mqtt.instance = new mqtt(url); } else if (url && !mqtt.instance.client) { mqtt.instance.connect(url); } return mqtt.instance; } private connect(url: string): void { console.log(url, clientoptions); if (!this.client) { this.client = mqtt.connect(url, clientoptions); this.client.on('connect', this.onconnect); this.client.on('reconnect', this.onreconnect); this.client.on('error', this.onerror); this.client.on('message', this.onmessage); } } public disconnect(): void { if (this.client) { this.client.end(); this.client = undefined; this.subscribemembers = {}; this.isconnected = false; console.log(`服务器已断开连接!`); } } public subscribe({ topic, callback }: subscribeoptions): void { if (this.isconnected) { this.client?.subscribe(topic, { qos: 1 }, error => { if (error) { console.log(`客户端: ${clientoptions.clientid}, 订阅主题: ${topic}失败: `, error); } else { console.log(`客户端: ${clientoptions.clientid}, 订阅主题: ${topic}成功`); } }); this.subscribemembers[topic] = callback; } else { this.pendingsubscriptions.push({ topic, callback }); } } public unsubscribe(topic: string): void { if (!this.client) { return; } this.client.unsubscribe(topic, error => { if (error) { console.log(`客户端: ${clientoptions.clientid}, 取消订阅主题: ${topic}失败: `, error); } else { console.log(`客户端: ${clientoptions.clientid}, 取消订阅主题: ${topic}成功`); } }); this.subscribemembers[topic] = undefined; } public publish({ topic, message }: publishoptions): void { if (this.isconnected) { this.client?.publish(topic, message, { qos: 1 }, e => { if (e) { console.log(`客户端: ${clientoptions.clientid}, 发送主题为: ${topic} 的消息, 发送失败: `, e); } }); } else { this.pendingpublications.push({ topic, message }); } } private onconnect = (e: any): void => { console.log(`客户端: ${clientoptions.clientid}, 连接服务器成功:`, e); this.isconnected = true; this.processpendingsubscriptions(); this.processpendingpublications(); }; private onreconnect = (): void => { console.log(`客户端: ${clientoptions.clientid}, 正在重连:`); this.isconnected = false; }; private onerror = (error: error): void => { console.log(`客户端: ${clientoptions.clientid}, 连接失败:`, error); this.isconnected = false; }; private onmessage = (topic: string, message: buffer): void => { // console.log( // `客户端: ${clientoptions.clientid}, 接收到来自主题: ${topic} 的消息: `, // message.tostring(), // ); const callback = this.subscribemembers?.[topic]; callback?.(topic, message.tostring()); }; private processpendingsubscriptions(): void { while (this.pendingsubscriptions.length > 0) { const { topic, callback, subscribeoption } = this.pendingsubscriptions.shift()!; this.subscribe({ topic, callback, subscribeoption }); } } private processpendingpublications(): void { while (this.pendingpublications.length > 0) { const { topic, message } = this.pendingpublications.shift()!; this.publish({ topic, message }); } } } const clientoptions: clientoptions = { clean: true, connecttimeout: 500, protocolversion: 5, rejectunauthorized: false, username: 'admin', password: 'anjian-emqx', clientid: `client-${date.now()}` }; // export default mqtt.getinstance("ws://192.168.11.14:8083/mqtt"); // export default mqtt.getinstance("ws://192.168.11.14:8083/mqtt"); // export default mqtt.getinstance(json.parse(import.meta.env.vite_other_service_base_url).mqtt); const { protocol, host } = window.location; export default mqtt.getinstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);
注意:
1.环境配置
.env.test vite_other_service_base_url= `{ "mqtt": "ws://192.168.11.14:8083/mqtt" }`
2.qos设置 前后端统一为1
以上就是vue3中如何使用mqtt数据传输的详细内容,更多关于vue3 mqtt数据传输的资料请关注代码网其它相关文章!
发表评论