1、前言
介绍完基础理论部分,下面在windows平台上搭建一个简单的mqtt应用,进行简单的应用,整体架构如下图所示;
消息模型:
运用mqtt协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景,如下图所示
前面介绍过,mqtt可以运行在几乎所有的平台,windows,linux什么的都可以,各种语言都有实现mqtt的组件,.net也好,java也好,都有封装好的mqtt服务器和客户端组件或插件来实现,本系列是在.net平台下实现mqtt的服务器和客户端。
常见的mqtt服务器包括eclipse mosquitto、emq x、hivemq、rabbitmq、mqttnet等,本系列文章都是基于.net平台的mqtt服务通信,开发环境vs2022,.net framework4.8。
2、服务器搭建
1、创建项目方案
2、添加库引用
3、ui布局
4、控件代码
“启动”按钮
停止按钮
窗体加载
全部完整代码
注意这里面用到了多线程及任务task,委托action,异步async及await的技术,在mqtt中必须使用这些技术,否则界面会卡死。
using mqttnet.client.receiving;
using mqttnet.server;
using mqttnet;
using system;
using system.collections.generic;
using system.componentmodel;
using system.data;
using system.drawing;
using system.linq;
using system.text;
using system.threading.tasks;
using system.windows.forms;
using mqttnet.protocol;
namespace mqttnetserverforms
{
public partial class form1 : form
{
private imqttserver server;//mqtt服务器对象
list<topicitem> topics = new list<topicitem>();
public form1()
{
initializecomponent();
}
private void form1_load(object sender, eventargs e)
{
//创建服务器对象
server = new mqttfactory().createmqttserver();
server.applicationmessagereceivedhandler = new mqttapplicationmessagereceivedhandlerdelegate(new action<mqttapplicationmessagereceivedeventargs>(server_applicationmessagereceived));//绑定消息接收事件
server.clientconnectedhandler = new mqttserverclientconnectedhandlerdelegate(new action<mqttserverclientconnectedeventargs>(server_clientconnected));//绑定客户端连接事件
server.clientdisconnectedhandler = new mqttserverclientdisconnectedhandlerdelegate(new action<mqttserverclientdisconnectedeventargs>(server_clientdisconnected));//绑定客户端断开事件
server.clientsubscribedtopichandler = new mqttserverclientsubscribedhandlerdelegate(new action<mqttserverclientsubscribedtopiceventargs>(server_clientsubscribedtopic));//绑定客户端订阅主题事件
server.clientunsubscribedtopichandler = new mqttserverclientunsubscribedtopichandlerdelegate(new action<mqttserverclientunsubscribedtopiceventargs>(server_clientunsubscribedtopic));//绑定客户端退订主题事件
server.startedhandler = new mqttserverstartedhandlerdelegate(new action<eventargs>(server_started));//绑定服务端启动事件
server.stoppedhandler = new mqttserverstoppedhandlerdelegate(new action<eventargs>(server_stopped));//绑定服务端停止事件
}
/// <summary>
/// 绑定消息接收事件
/// </summary>
/// <param name="e"></param>
private void server_applicationmessagereceived(mqttapplicationmessagereceivedeventargs e)
{
string msg = e.applicationmessage.convertpayloadtostring();
writelog(">>> 收到消息:" + msg + ",qos =" + e.applicationmessage.qualityofservicelevel + ",客户端=" + e.clientid + ",主题:" + e.applicationmessage.topic);
}
/// <summary>
/// 绑定客户端连接事件
/// </summary>
/// <param name="e"></param>
private void server_clientconnected(mqttserverclientconnectedeventargs e)
{
task.run(new action(() =>
{
lbclients.begininvoke(new action(() =>
{
lbclients.items.add(e.clientid);
}));
}));
writelog(">>> 客户端" + e.clientid + "连接");
}
/// <summary>
/// 绑定客户端断开事件
/// </summary>
/// <param name="e"></param>
private void server_clientdisconnected(mqttserverclientdisconnectedeventargs e)
{
task.run(new action(() =>
{
lbclients.begininvoke(new action(() =>
{
lbclients.items.remove(e.clientid);
}));
}));
writelog(">>> 客户端" + e.clientid + "断开");
}
/// <summary>
/// 绑定客户端订阅主题事件
/// </summary>
/// <param name="e"></param>
private void server_clientsubscribedtopic(mqttserverclientsubscribedtopiceventargs e)
{
task.run(new action(() =>
{
var topic = topics.firstordefault(t => t.topic == e.topicfilter.topic);
if (topic == null)
{
topic = new topicitem { topic = e.topicfilter.topic, count = 0 };
topics.add(topic);
}
if (!topic.clients.exists(c => c == e.clientid))
{
topic.clients.add(e.clientid);
topic.count++;
}
lvtopic.invoke(new action(() =>
{
this.lvtopic.items.clear();
}));
foreach (var item in this.topics)
{
lvtopic.invoke(new action(() =>
{
this.lvtopic.items.add($"{item.topic}:{item.count}");
}));
}
}));
writelog(">>> 客户端" + e.clientid + "订阅主题" + e.topicfilter.topic);
}
/// <summary>
/// 绑定客户端退订主题事件
/// </summary>
/// <param name="e"></param>
private void server_clientunsubscribedtopic(mqttserverclientunsubscribedtopiceventargs e)
{
task.run(new action(() =>
{
var topic = topics.firstordefault(t => t.topic == e.topicfilter);
if (topic != null)
{
topic.count--;
topic.clients.remove(e.clientid);
}
this.lvtopic.items.clear();
foreach (var item in this.topics)
{
this.lvtopic.items.add($"{item.topic}:{item.count}");
}
}));
writelog(">>> 客户端" + e.clientid + "退订主题" + e.topicfilter);
}
/// <summary>
/// 绑定服务端启动事件
/// </summary>
/// <param name="e"></param>
private void server_started(eventargs e)
{
writelog(">>> 服务端已启动!");
}
/// <summary>
/// 绑定服务端停止事件
/// </summary>
/// <param name="e"></param>
private void server_stopped(eventargs e)
{
writelog(">>> 服务端已停止!");
}
/// <summary>
/// 显示日志
/// </summary>
/// <param name="message"></param>
public void writelog(string message)
{
if (txtmsg.invokerequired)
{
txtmsg.invoke(new action(() =>
{
txtmsg.text = "";
txtmsg.text = (message + "\r");
}));
}
else
{
txtmsg.text = "";
txtmsg.text = (message + "\r");
}
}
/// <summary>
/// 启动
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
[obsolete]
private async void btnstart_click(object sender, eventargs e)
{
var optionbuilder = new mqttserveroptionsbuilder()
.withdefaultendpointboundipaddress(system.net.ipaddress.parse(this.txtip.text))
.withdefaultendpointport(int.parse(this.txtport.text))
.withdefaultcommunicationtimeout(timespan.frommilliseconds(5000))
.withconnectionvalidator(t =>
{
string un = "", pwd = "";
un = this.txtuname.text;
pwd = this.txtupwd.text;
if (t.username != un || t.password != pwd)
{
t.returncode = mqttconnectreturncode.connectionrefusedbadusernameorpassword;
}
else
{
t.returncode = mqttconnectreturncode.connectionaccepted;
}
});
var option = optionbuilder.build();
//启动
await server.startasync(option);
writelog(">>> 服务器启动成功");
}
/// <summary>
/// 停止
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void btnstop_click(object sender, eventargs e)
{
if (server != null)
{
server.stopasync();
}
}
}
}
注意这个端口,帐号,密码可以自己决定,ip地址也是。
注意这里面的代码,服务器上必须注册绑定实现的几个事件:消息接收事件 ,客户端连接事件,客户端断开事件,客户端订阅主题事件,客户端退订主题事件,服务端启动事件,服务端停止事件
server.applicationmessagereceivedhandler = new mqttapplicationmessagereceivedhandlerdelegate(new action<mqttapplicationmessagereceivedeventargs>(server_applicationmessagereceived));//绑定消息接收事件
server.clientconnectedhandler = new mqttserverclientconnectedhandlerdelegate(new action<mqttserverclientconnectedeventargs>(server_clientconnected));//绑定客户端连接事件
server.clientdisconnectedhandler = new mqttserverclientdisconnectedhandlerdelegate(new action<mqttserverclientdisconnectedeventargs>(server_clientdisconnected));//绑定客户端断开事件
server.clientsubscribedtopichandler = new mqttserverclientsubscribedhandlerdelegate(new action<mqttserverclientsubscribedtopiceventargs>(server_clientsubscribedtopic));//绑定客户端订阅主题事件
server.clientunsubscribedtopichandler = new mqttserverclientunsubscribedtopichandlerdelegate(new action<mqttserverclientunsubscribedtopiceventargs>(server_clientunsubscribedtopic));//绑定客户端退订主题事件
server.startedhandler = new mqttserverstartedhandlerdelegate(new action<eventargs>(server_started));//绑定服务端启动事件
server.stoppedhandler = new mqttserverstoppedhandlerdelegate(new action<eventargs>(server_stopped));//绑定服务端停止事件
启动测试服务器
启动成功。
3、客户端创建
1、添加项目
2、添加库引用
注意这里添加的与服务器不一样,别混错了
3、ui布局
布局使用的是常规的label,textbox,button
4、控件代码
连接代码
订阅代码
发布代码
完整代码
using mqttnet.client.options;
using mqttnet.client;
using mqttnet.extensions.managedclient;
using system;
using system.collections.generic;
using system.componentmodel;
using system.data;
using system.drawing;
using system.linq;
using system.text;
using system.threading.tasks;
using system.windows.forms;
using mqttnet;
namespace mqttnetclientforms
{
public partial class form1 : form
{
private imanagedmqttclient mqttclient;//客户端mqtt对象
public form1()
{
initializecomponent();
}
/// <summary>
/// 连接
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void btnconn_click(object sender, eventargs e)
{
var mqttclientoptions = new mqttclientoptionsbuilder()
.withclientid(this.txtid.text)
.withtcpserver(this.txtip.text, int.parse(this.txtport.text))
.withcredentials(this.txtname.text, this.txtupwd.text);
var options = new managedmqttclientoptionsbuilder()
.withautoreconnectdelay(timespan.fromseconds(5))
.withclientoptions(mqttclientoptions.build())
.build();
//开启
await mqttclient.startasync(options);
}
/// <summary>
/// 断开
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void btnclose_click(object sender, eventargs e)
{
if (mqttclient != null)
{
if (mqttclient.isstarted)
{
await mqttclient.stopasync();
}
mqttclient.dispose();
}
}
private void form1_load(object sender, eventargs e)
{
var factory = new mqttfactory();
mqttclient = factory.createmanagedmqttclient();//创建客户端对象
//绑定断开事件
mqttclient.usedisconnectedhandler(async ee =>
{
writelog("与服务器之间的连接断开了,正在尝试重新连接");
// 等待 5s 时间
await task.delay(timespan.fromseconds(5));
try
{
mqttclient.useconnectedhandler(tt =>
{
writelog(">>> 连接到服务成功");
});
}
catch (exception ex)
{
writelog($"重新连接服务器失败:{ex}");
}
});
//绑定接收事件
mqttclient.useapplicationmessagereceivedhandler(aa =>
{
try
{
string msg = aa.applicationmessage.convertpayloadtostring();
writelog(">>> 消息:" + msg + ",qos =" + aa.applicationmessage.qualityofservicelevel + ",客户端=" + aa.clientid + ",主题:" + aa.applicationmessage.topic);
}
catch (exception ex)
{
writelog($"+ 消息 = " + ex.message);
}
});
//绑定连接事件
mqttclient.useconnectedhandler(ee =>
{
writelog(">>> 连接到服务成功");
});
}
/// <summary>
/// 显示日志
/// </summary>
/// <param name="message"></param>
private void writelog(string message)
{
if (txtmsg.invokerequired)
{
txtmsg.invoke(new action(() =>
{
txtmsg.text = (message);
}));
}
else
{
txtmsg.text = (message);
}
}
/// <summary>
/// 订阅
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
[obsolete]
private async void btnsub_click(object sender, eventargs e)
{
if (string.isnullorwhitespace(this.txttopic.text))
{
writelog(">>> 请输入主题");
return;
}
//在 mqtt 中有三种 qos 级别:
//at most once(0) 最多一次
//at least once(1) 至少一次
//exactly once(2) 恰好一次
//await mqttclient.subscribeasync(new topicfilterbuilder().withtopic(this.tbtopic.text).withatmostonceqos().build());//最多一次, qos 级别0
await mqttclient.subscribeasync(new topicfilterbuilder().withtopic(this.txttopic.text).withatleastonceqos().build());//恰好一次, qos 级别1
writelog($">>> 成功订阅");
}
/// <summary>
/// 发布
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void btnpub_click(object sender, eventargs e)
{
if (string.isnullorwhitespace(this.txttopik.text))
{
writelog(">>> 请输入主题");
return;
}
var result = await mqttclient.publishasync(
this.txttopik.text,
this.txtcontent.text,
mqttnet.protocol.mqttqualityofservicelevel.atleastonce);//恰好一次, qos 级别1
writelog($">>> 主题:{this.txttopik.text},消息:{this.txtcontent.text},结果: {result.reasoncode}");
}
}
}
4、运行测试
生成编译解决方案,成功后开始测试
1、启动服务器
2、启动客户端
找到生成的客户端debug目录下的.exe文件
3、测试连接
连接成功,服务器看到客户端上线了
4、测试订阅
再运行一个客户端,连接服务器
5、测试发布
c1向cced主题发布一个消息,结果是c1,c2都收到了消息
同样,c2发布一个消息,c1,c2都收到了消息
6、测试下线
c1关闭,服务器马上知道了
5、小结
基于mqttnet的组件搭建的mqtt服务器和客户端通信成功,发布和订阅都ko,ko,ko。
讲解不易,分析不易,原创不易,整理不易,伙伴们动动你的金手指,你的支持是我最大的动力。
发表评论