当前位置: 代码网 > it编程>编程语言>Asp.net > C#使用 MQTTnet 快速实现 MQTT 通信

C#使用 MQTTnet 快速实现 MQTT 通信

2024年08月01日 Asp.net 我要评论
Console.WriteLine($"客户端[{e.ClientId}]>> 主题:{e.ApplicationMessage.Topic} 负荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}");MQTT 服务端主要用于与多个客户端保持连接,并处理客户端的发布和订阅等逻辑。

1 什么是 mqtt ?
mqtt(message queuing telemetry transport,消息队列遥测传输)是 ibm 开发的一个即时通讯协议,有可能成为物联网的重要组成部分。mqtt 是基于二进制消息的发布/订阅编程模式的消息协议,如今已经成为 oasis 规范,由于规范很简单,非常适合需要低功耗和网络带宽有限的 iot 场景。mqtt官网

2 mqttnet
mqttnet 是一个基于 mqtt 通信的高性能 .net 开源库,它同时支持 mqtt 服务器端和客户端。而且作者也保持更新,目前支持新版的.net core,这也是选择 mqttnet 的原因。 mqttnet 在 github 并不是下载最多的 .net 的 mqtt 开源库,其他的还 mqttdotnet、nmqtt、m2mqtt 等

mqttnet is a high performance .net library for mqtt based communication. it provides a mqtt client and a mqtt server (broker). the implementation is based on the documentation from http://mqtt.org/.

3 创建项目并导入类库
这里我们使用 visual studio 2017 创建一个空解决方案,并在其中添加两个项目,即一个服务端和一个客户端,服务端项目模板选择最新的 .net core 控制台应用,客户端项目选择传统的 winform 窗体应用程序。.net core 项目模板如下图所示: 

在解决方案在右键单击-选择“管理解决方案的 nuget 程序包”-在“浏览”选项卡下面搜索 mqttnet,为服务端项目和客户端项目都安装上 mqttnet 库,当前最新稳定版为 2.4.0。项目结构如下图所示: 


4 服务端
mqtt 服务端主要用于与多个客户端保持连接,并处理客户端的发布和订阅等逻辑。一般很少直接从服务端发送消息给客户端(可以使用 mqttserver.publish(appmsg); 直接发送消息),多数情况下服务端都是转发主题匹配的客户端消息,在系统中起到一个中介的作用。

4.1 创建服务端并启动
创建服务端最简单的方式是采用 mqttserverfactory 对象的 createmqttserver 方法来实现,该方法需要一个mqttserveroptions 参数。

var options = new mqttserveroptions();
var mqttserver = new mqttserverfactory().createmqttserver(options);
通过上述方式创建了一个 imqttserver 对象后,调用其 startasync 方法即可启动 mqtt 服务。值得注意的是:之前版本采用的是 start 方法,作者也是紧跟 c# 语言新特性,能使用异步的地方也都改为异步方式。

await mqttserver.startasync();
 

4.2 验证客户端

在 mqttserveroptions 选项中,你可以使用 connectionvalidator 来对客户端连接进行验证。比如客户端id标识 clientid,用户名 username 和密码 password 等。

var options = new mqttserveroptions
{
connectionvalidator = c =>
{
if (c.clientid.length < 10)
{
return mqttconnectreturncode.connectionrefusedidentifierrejected;
}

if (c.username != "xxx" || c.password != "xxx")
{
return mqttconnectreturncode.connectionrefusedbadusernameorpassword;
}

return mqttconnectreturncode.connectionaccepted;
}
};
 

4.3 相关事件

服务端支持 clientconnected、clientdisconnected 和 applicationmessagereceived 事件,分别用来检查客户端连接、客户端断开以及接收客户端发来的消息。

其中 clientconnected 和 clientdisconnected 事件的事件参数一个客户端连接对象 connectedmqttclient,通过该对象可以获取客户端id标识 clientid 和 mqtt 版本 protocolversion。

applicationmessagereceived 的事件参数包含了客户端id标识 clientid 和 mqtt 应用消息 mqttapplicationmessage 对象,通过该对象可以获取主题 topic、qos qualityofservicelevel 和消息内容 payload 等信息。

5 客户端
mqtt 与 http 不同,后者是基于请求/响应方式的,服务器端无法直接发送数据给客户端。而 mqtt 是基于发布/订阅模式的,所有的客户端均与服务端保持连接状态。

那么客户端之间是如何通信的呢?

具体逻辑是:某些客户端向服务端订阅它感兴趣(主题)的消息,另一些客户端向服务端发布(主题)消息,服务端将订阅和发布的主题进行匹配,并将消息转发给匹配通过的客户端。

5.1 创建客户端并连接
使用 mqttnet 创建 mqtt 也非常简单,只需要使用 mqttclientfactory 对象的 createmqttclient 方法即可。

var mqttclient = new mqttclientfactory().createmqttclient();
 

创建客户端对象后,调用其异步方法 connectasync 来连接到服务端。

await mqttclient.connectasync(options);
 

调用该方法时需要传递一个 mqttclienttcpoptions 对象(之前的版本是在创建对象时使用该选项),该选项包含了客户端id标识 clientid、服务端地址(可以使用ip地址或域名)server、端口号 port、用户名 username、密码 password 等信息。

var options = new mqttclienttcpoptions
{
server = "127.0.0.1",
clientid = "c001",
username = "u001",
password = "p001",
cleansession = true
};
 

5.2 相关事件

客户端支持 connected、disconnected 和 applicationmessagereceived 事件,用来处理客户端与服务端连接、客户端从服务端断开以及客户端收到消息的事情。

5.2 订阅消息
客户端连接到服务端之后,可以使用 subscribeasync 异步方法订阅消息,该方法可以传入一个可枚举或可变参数的主题过滤器 topicfilter 参数,主题过滤器包含主题名和 qos 等级。

mqttclient.subscribeasync(new list<topicfilter> {
new topicfilter("家/客厅/空调/#", mqttqualityofservicelevel.atmostonce)
});
 

5.3 发布消息

发布消息前需要先构建一个消息对象 mqttapplicationmessage,最直接的方法是使用其实构造函数,传入主题、内容、qos 等参数。

mqttclient.subscribeasync(new list<topicfilter> {
new topicfilter("家/客厅/空调/#", mqttqualityofservicelevel.atmostonce)
});
 

得到 mqttapplicationmessage 消息对象后,通过客户端对象调用其 publishasync 异步方法进行消息发布。

mqttclient.publishasync(appmsg);
 

6 跟踪消息

mqttnet 提供了一个静态类 mqttnettrace 来对消息进行跟踪,该类可用于服务端和客户端。mqttnettrace 的事件tracemessagepublished 用于跟踪服务端和客户端应用的日志消息,比如启动、停止、心跳、消息订阅和发布等。事件参数mqttnettracemessagepublishedeventargs 包含了线程id threadid、来源 source、日志级别 level、日志消息 message、异常信息 exception 等。

mqttnettrace.tracemessagepublished += mqttnettrace_tracemessagepublished;

private static void mqttnettrace_tracemessagepublished(object sender, mqttnettracemessagepublishedeventargs e)
{
console.writeline($">> 线程id:{e.threadid} 来源:{e.source} 跟踪级别:{e.level} 消息: {e.message}");

if (e.exception != null)
{
console.writeline(e.exception);
}
}
 

同时 mqttnettrace 类还提供了4个不同消息等级的静态方法,verbose、information、warning 和 error,用于给出不同级别的日志消息,该消息将会在 tracemessagepublished 事件中输出,你可以使用 e.level 进行过虑。

7 运行效果
以下分别是服务端、客户端1和客户端2的运行效果,其中客户端1和客户端2只是同一个项目运行了两个实例。客户端1用于订阅传感器的“温度”数据,并模拟上位机(如 app 等)发送开关控制命令;客户端2订阅上位机传来的“开关”控制命令,并模拟温度传感器上报温度数据。

7.1 服务端


7.2 客户端1


7.2 客户端2


8 demo代码
8.1 服务端代码
 
using mqttnet;
using mqttnet.core.adapter;
using mqttnet.core.diagnostics;
using mqttnet.core.protocol;
using mqttnet.core.server;
using system;
using system.text;
using system.threading;

namespace mqttservertest
{
class program
{
private static mqttserver mqttserver = null;

static void main(string[] args)
{
mqttnettrace.tracemessagepublished += mqttnettrace_tracemessagepublished;
new thread(startmqttserver).start();

while (true)
{
var inputstring = console.readline().tolower().trim();

if (inputstring == "exit")
{
mqttserver?.stopasync();
console.writeline("mqtt服务已停止!");
break;
}
else if (inputstring == "clients")
{
foreach (var item in mqttserver.getconnectedclients())
{
console.writeline($"客户端标识:{item.clientid},协议版本:{item.protocolversion}");
}
}
else
{
console.writeline($"命令[{inputstring}]无效!");
}
}
}

private static void startmqttserver()
{
if (mqttserver == null)
{
try
{
var options = new mqttserveroptions
{
connectionvalidator = p =>
{
if (p.clientid == "c001")
{
if (p.username != "u001" || p.password != "p001")
{
return mqttconnectreturncode.connectionrefusedbadusernameorpassword;
}
}

return mqttconnectreturncode.connectionaccepted;
}
};

mqttserver = new mqttserverfactory().createmqttserver(options) as mqttserver;
mqttserver.applicationmessagereceived += mqttserver_applicationmessagereceived;
mqttserver.clientconnected += mqttserver_clientconnected;
mqttserver.clientdisconnected += mqttserver_clientdisconnected;
}
catch (exception ex)
{
console.writeline(ex.message);
return;
}
}

mqttserver.startasync();
console.writeline("mqtt服务启动成功!");
}

private static void mqttserver_clientconnected(object sender, mqttclientconnectedeventargs e)
{
console.writeline($"客户端[{e.client.clientid}]已连接,协议版本:{e.client.protocolversion}");
}

private static void mqttserver_clientdisconnected(object sender, mqttclientdisconnectedeventargs e)
{
console.writeline($"客户端[{e.client.clientid}]已断开连接!");
}

private static void mqttserver_applicationmessagereceived(object sender, mqttapplicationmessagereceivedeventargs e)
{
console.writeline($"客户端[{e.clientid}]>> 主题:{e.applicationmessage.topic} 负荷:{encoding.utf8.getstring(e.applicationmessage.payload)} qos:{e.applicationmessage.qualityofservicelevel} 保留:{e.applicationmessage.retain}");
}

private static void mqttnettrace_tracemessagepublished(object sender, mqttnettracemessagepublishedeventargs e)
{
/*console.writeline($">> 线程id:{e.threadid} 来源:{e.source} 跟踪级别:{e.level} 消息: {e.message}");
if (e.exception != null)
{
console.writeline(e.exception);
}*/
}
}
}
8.2 客户端代码
 
using mqttnet;
using mqttnet.core;
using mqttnet.core.client;
using mqttnet.core.packets;
using mqttnet.core.protocol;
using system;
using system.collections.generic;
using system.text;
using system.threading.tasks;
using system.windows.forms;

namespace mqttclientwin
{
public partial class fmmqttclient : form
{
private mqttclient mqttclient = null;

public fmmqttclient()
{
initializecomponent();

task.run(async () => { await connectmqttserverasync(); });
}

private async task connectmqttserverasync()
{
if (mqttclient == null)
{
mqttclient = new mqttclientfactory().createmqttclient() as mqttclient;
mqttclient.applicationmessagereceived += mqttclient_applicationmessagereceived;
mqttclient.connected += mqttclient_connected;
mqttclient.disconnected += mqttclient_disconnected;
}

try
{
var options = new mqttclienttcpoptions
{
server = "127.0.0.1",
clientid = guid.newguid().tostring().substring(0, 5),
username = "u001",
password = "p001",
cleansession = true
};

await mqttclient.connectasync(options);
}
catch (exception ex)
{
invoke((new action(() =>
{
txtreceivemessage.appendtext($"连接到mqtt服务器失败!" + environment.newline + ex.message + environment.newline);
})));
}
}

private void mqttclient_connected(object sender, eventargs e)
{
invoke((new action(() =>
{
txtreceivemessage.appendtext("已连接到mqtt服务器!" + environment.newline);
})));
}

private void mqttclient_disconnected(object sender, eventargs e)
{
invoke((new action(() =>
{
txtreceivemessage.appendtext("已断开mqtt连接!" + environment.newline);
})));
}

private void mqttclient_applicationmessagereceived(object sender, mqttapplicationmessagereceivedeventargs e)
{
invoke((new action(() =>
{
txtreceivemessage.appendtext($">> {encoding.utf8.getstring(e.applicationmessage.payload)}{environment.newline}");
})));
}

private void btnsubscribe_clickasync(object sender, eventargs e)
{
string topic = txtsubtopic.text.trim();

if (string.isnullorempty(topic))
{
messagebox.show("订阅主题不能为空!");
return;
}

if (!mqttclient.isconnected)
{
messagebox.show("mqtt客户端尚未连接!");
return;
}

mqttclient.subscribeasync(new list<topicfilter> {
new topicfilter(topic, mqttqualityofservicelevel.atmostonce)
});

txtreceivemessage.appendtext($"已订阅[{topic}]主题" + environment.newline);
txtsubtopic.enabled = false;
btnsubscribe.enabled = false;
}

private void btnpublish_click(object sender, eventargs e)
{
string topic = txtpubtopic.text.trim();

if (string.isnullorempty(topic))
{
messagebox.show("发布主题不能为空!");
return;
}

string inputstring = txtsendmessage.text.trim();
var appmsg = new mqttapplicationmessage(topic, encoding.utf8.getbytes(inputstring), mqttqualityofservicelevel.atmostonce, false);
mqttclient.publishasync(appmsg);
}
}
}

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com