mqtt(message queueing telemetry transport) 消息队列遥测传输,在物联网领域应用的很广泛,它是基于publish/subscribe模式,具有简单易用,支持qos,传输效率高的特点。
它被设计用于低带宽,不稳定或高延迟的网络环境,因此非常适合于设备之间的数据通信。
emqx提供了mqtt的服务器,并且可以在后台网页查看面板,还支持中文显示。
由于5.0之后的版本不再支持windows所以使用的是4.0版本的包,在下载完压缩包后,不用安装,进入cmd导航到安装的bin目录下(注意:路径中不能包含中文),执行命令:emqx start,看见没有报错就说明启动成功了。
之后在浏览器里输入:http://127.0.0.1:18083 进入面板。

在websocket菜单里可以模拟发布/订阅的操作,接下来我们将使用c#完成这一系列的操作。
1、连接主机
首先新建一个wpf项目,然后在nuget中下载mqttnet。
// 连接主机
mqttfactory factory = new mqttfactory();
_client = factory.createmqttclient();
var options = new mqttclientoptionsbuilder().
withtcpserver(this.ipaddress.text, convert.toint32(this.port.text))
.withclientid(this.clientid.text)
.build();
var result = await _client.connectasync(options, cancellationtoken.none);
if (result.resultcode == mqttclientconnectresultcode.success)
{
this.log.text = datetime.now.tostring() + " 连接成功" + environment.newline + this.log.text;
}
else
{
this.log.text = datetime.now.tostring() + $" 连接失败,{result.reasonstring}" + environment.newline + this.log.text;
return;
}2、订阅消息
订阅消息分为两块,一个是消息的回显,一个是订阅消息。
// 订阅消息
var option = new mqttclientsubscribeoptions();
mqttqualityofservicelevel level;
switch (this.subscribeqos.selectedindex)
{
case 0:
level = mqttnet.protocol.mqttqualityofservicelevel.atmostonce;
break;
case 1:
level = mqttnet.protocol.mqttqualityofservicelevel.atleastonce;
break;
case 2:
level = mqttnet.protocol.mqttqualityofservicelevel.exactlyonce;
break;
default:
throw new exception("请选择服务质量");
}
option.topicfilters = new list<mqtttopicfilter>()
{
new mqtttopicfilter()
{
topic = this.subscribetopic.text,
qualityofservicelevel = level
}
};
this._client.subscribeasync(option, cancellationtoken.none);
// 将订阅的消息回显到日志区
this._client.applicationmessagereceivedasync += e =>
{
var task = task.factory.startnew(() => {
try
{
var array = e.applicationmessage.payloadsegment.array;
if (array == null)
{
return;
}
var str = encoding.utf8.getstring(array);
// 跨线程更新ui
application.current.dispatcher.invoke(() => {
this.log.text = datetime.now.tostring() + " 收到消息:" + str + environment.newline + this.log.text;
});
}
catch (exception ex)
{
this.log.text = datetime.now.tostring() + $" {ex.message}" + environment.newline + this.log.text;
}
});
return task;
};
this.log.text = datetime.now.tostring() + " 订阅成功" + environment.newline + this.log.text;订阅消息只需要两个参数:主题topic和服务质量qoc,主题是用来区分不同频段的消息,避免出现冲突,如果想接收到所有的消息可以这么写:topicxxx/#,#就代表不限制范围,如果打算只接受固定区域的消息,则需要将#改成某个字符串。
服务质量qoc是用来控制可用性的,0是最低等级,最多只发送一次,1是中级,至少发一次,但有可能出现重复接收的情况,2是最高级,只发一次,不会多也不会少。
将消息回显需要注册applicationmessagereceivedasync事件,传入的参数是回显对象,返回值是一个task类型,是在task中获取回显的值并完成控件的更新操作。
3、发布消息
发布消息的参数比订阅多两个:消息内容payload,持久会话(在恢复连接后保留之前的订阅和消息传递状态)
var msg = new mqttapplicationmessage();
msg.topic = this.topic.text;
msg.payloadsegment = encoding.utf8.getbytes(this.msg.text);
msg.retain = issave.ischecked??false;
mqttqualityofservicelevel level;
switch (this.publishqos.selectedindex)
{
case 0:
level = mqttnet.protocol.mqttqualityofservicelevel.atmostonce;
break;
case 1:
level = mqttnet.protocol.mqttqualityofservicelevel.atleastonce;
break;
case 2:
level = mqttnet.protocol.mqttqualityofservicelevel.exactlyonce;
break;
default:
throw new exception("请选择服务质量");
}
msg.qualityofservicelevel = level;
var resultpublish = await _client.publishasync(msg, cancellationtoken.none);
if (resultpublish.issuccess == true)
{
this.log.text = datetime.now.tostring() + " 发送成功" + environment.newline + this.log.text;
}
else
{
this.log.text = datetime.now.tostring() + " 发送失败" + environment.newline + this.log.text;
}到此这篇关于基于c#实现mqtt通信实战的文章就介绍到这了,更多相关c# mqtt通信内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论