1 dll版本
mqttnet.dll版本-2.7.5.0
基于比较老的项目中应用的dll,其他更高版本变化可能较大,谨慎参考。
2 服务器
开启服务器
关闭服务器
绑定事件【客户端连接服务器事件】
绑定事件【客户端断开(服务器)连接事件】
绑定事件【客户端订阅主题事件】
绑定事件【客户端退订主题事件】
绑定事件【接收客户端(发送)消息事件】
using system;
using system.net;
using mqttnet;
using mqttnet.server;
namespace demo_mqtt.model
{
public class servermodel
{
private static mqttserver _mqttserver = null;
private readonly action<string> _callbacklog;
public servermodel(action<string> callbacklog)
{
_callbacklog = callbacklog;
}
/// <summary>
/// 绑定客户端连接服务器事件
/// </summary>
private void mqttserver_clientconnected(object sender, mqttclientconnectedeventargs e)
{
writelog($"客户端[{e.client.clientid}]已连接 {datetime.now:yyyy-mm-dd hh:mm:ss}{environment.newline}");
}
/// <summary>
/// 绑定客户端断开连接事件
/// </summary>
private void mqttserver_clientdisconnected(object sender, mqttclientdisconnectedeventargs e)
{
writelog($"客户端[{e.client.clientid}]已断开连接 {datetime.now:yyyy-mm-dd hh:mm:ss}{environment.newline}");
}
/// <summary>
/// 绑定客户端订阅主题事件
/// </summary>
private void server_clientsubscribedtopic(object sensor, mqttclientsubscribedtopiceventargs e)
{
writelog($">>> 客户端{e.clientid}订阅主题{e.topicfilter.topic}");
}
/// <summary>
/// 绑定客户端退订主题事件
/// </summary>
/// <param name="e"></param>
private void server_clientunsubscribedtopic(object sensor, mqttclientunsubscribedtopiceventargs e)
{
writelog($">>> 客户端{e.clientid}退订主题{e.topicfilter}");
}
/// <summary>
/// 绑定接收客户端消息事件
/// </summary>
private void mqttserver_applicationmessagereceived(object sender, mqttapplicationmessagereceivedeventargs e)
{
writelog($"接收到{e.clientid}发送来的消息! {datetime.now:yyyy-mm-dd hh:mm:ss} {environment.newline}");
}
private void writelog(string log)
{
_callbacklog?.invoke(log);
}
/// <summary>
/// 开启服务器
/// </summary>
/// <param name="ip">ip地址</param>
/// <param name="port">端口号</param>
public void startserver(string ip, int port)
{
if (_mqttserver == null)
{
var optionsbuilder = new mqttserveroptionsbuilder()
.withdefaultendpointboundipaddress(ipaddress.parse(ip))
.withconnectionbacklog(1000)
.withdefaultendpointport(port);
imqttserveroptions options = optionsbuilder.build();
try
{
_mqttserver = new mqttfactory().createmqttserver() as mqttserver;
_mqttserver.clientconnected += mqttserver_clientconnected;
_mqttserver.clientdisconnected += mqttserver_clientdisconnected;
_mqttserver.applicationmessagereceived += mqttserver_applicationmessagereceived;
_mqttserver.clientsubscribedtopic += server_clientsubscribedtopic;
_mqttserver.clientunsubscribedtopic += server_clientunsubscribedtopic;
_mqttserver.startasync(options);
}
catch (exception ex)
{
console.writeline(ex.message);
return;
}
writelog($"mqtt服务器启动成功 {datetime.now:yyyy-mm-dd hh:mm:ss}{environment.newline}");
}
}
/// <summary>
/// 关闭服务器
/// </summary>
public void closeserver()
{
_mqttserver?.stopasync();
}
}
}
3 客户端
连接服务器
属性:客户端连接状态
客户端断开重连线程
获取所有订阅主题
订阅主题
退订主题
发送消息
绑定事件【客户端连接服务器事件】
绑定事件【客户端断开(服务器)连接事件】
绑定事件【客户端接收消息事件】
using system;
using system.collections.generic;
using system.linq;
using system.text;
using system.threading;
using system.threading.tasks;
using mqttnet;
using mqttnet.client;
using mqttnet.protocol;
namespace demo_mqtt.model
{
public class clientmodel
{
/// <summary>
/// 记录所有订阅主题,用于断开重连时重新订阅主题
/// </summary>
private readonly list<string> _subscribetopics = new list<string>();
private mqttclient _mqttclient = null;
private string _serverip;
private int _nserverport;
private bool _isrunningreconnectthreadstart = false;
private string _clienid;
/// <summary>
/// 接受消息回调函数,参数:主题,消息内容
/// </summary>
private readonly action<string, byte[]> _callbackreceived;
private readonly action<string> _callbacklog;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="callbackreceived">接受消息回调函数,参数:主题,消息内容</param>
/// <param name="callbacklog"></param>
public clientmodel(action<string, byte[]> callbackreceived, action<string> callbacklog)
{
_callbackreceived = callbackreceived;
_callbacklog = callbacklog;
}
/// <summary>
/// 连接服务器
/// </summary>
private async void connectserver()
{
try
{
if (_mqttclient == null)
{
_mqttclient = new mqttfactory().createmqttclient() as mqttclient;
_mqttclient.connected += (s, a) => writelog($"【{_clienid}】已连接到mqtt服务器!");
_mqttclient.disconnected += (s, a) => writelog($"【{_clienid}】已断开mqtt连接!");
_mqttclient.applicationmessagereceived += (sender, args) =>
{
_callbackreceived?.invoke(args.applicationmessage.topic, args.applicationmessage.payload);
};
}
if (_mqttclient.isconnected) return;
imqttclientoptions options = new mqttclientoptions
{
channeloptions = new mqttclienttcpoptions()
{
server = _serverip,
port = _nserverport
},
cleansession = true
};
_clienid = options.clientid;
await _mqttclient.connectasync(options);
if (_mqttclient.isconnected)
{
reconnectthreadstart();
subscribeasync();
}
}
catch (exception ex)
{
writelog("连接到mqtt服务器失败!");
}
}
/// <summary>
/// 客户端重连服务器线程-启动
/// </summary>
/// <returns></returns>
private void reconnectthreadstart()
{
if (_isrunningreconnectthreadstart) return;
if (_mqttclient != null)
{
new task(() =>
{
_isrunningreconnectthreadstart = true;
thread.sleep(5000);
while (true)
{
thread.sleep(1000);
if (!isconnect)
{
writelog($"客户端[{_clienid}]断开连接,尝试重新连接服务器中...");
int i;
for (i = 0; i < 60; i++)
{
if (isconnect) break;
writelog($"尝试第{i + 1}次连接服务器");
connectserver();
thread.sleep(1000);
if (isconnect) break;
}
_isrunningreconnectthreadstart = i < 60;
}
if (!_isrunningreconnectthreadstart) break;
}
}).start();
}
}
private void writelog(string log)
{
_callbacklog?.invoke(log);
}
/// <summary>
/// 客户端连接状态
/// </summary>
public bool isconnect => _mqttclient?.isconnected == true;
/// <summary>
/// 连接服务器
/// </summary>
/// <param name="serverip">服务器ip</param>
/// <param name="serverport">服务器端口</param>
/// <param name="topic"></param>
public async void connectserver(string serverip, int serverport)
{
_serverip = serverip;
_nserverport = serverport;
await task.run(() => { connectserver(); });
}
/// <summary>
/// 关闭客户端,断开客户端和服务器的连接
/// </summary>
public void closeclient()
{
_mqttclient.disconnectasync();
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic">发送主题</param>
/// <param name="cmd">发送内容</param>
[obsolete("obsolete")]
public void publishasync(string topic, string cmd)
{
var bytes = encoding.utf8.getbytes(cmd);
var mode = mqttqualityofservicelevel.atmostonce;
var appmsg = new mqttapplicationmessage(topic, bytes, mode, false);
_mqttclient.publishasync(appmsg);//发送消息
}
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topics">主题标识</param>
public void subscribeasync(params string[] topics)
{
foreach (var topic in topics)
{
if (!_subscribetopics.contains(topic))
{
_subscribetopics.add(topic);
}
}
var topicfilters = _subscribetopics.select(topic => new topicfilter(topic, mqttqualityofservicelevel.atmostonce)).tolist();
_mqttclient?.subscribeasync(topicfilters);
}
/// <summary>
/// 退订已订阅主题
/// </summary>
/// <param name="topics">主题标识</param>
public void unsubscribeasync(params string[] topics)
{
if (topics == null || topics.length == 0) return;
var topicfilters = topics.select(topic => new topicfilter(topic, mqttqualityofservicelevel.atmostonce)).tolist();
_mqttclient.subscribeasync(topicfilters);
}
/// <summary>
/// 获取所有订阅主题
/// </summary>
public string[] getalltopic => _subscribetopics.toarray();
}
}
到此这篇关于c#mqtt协议服务器与客户端通讯实现(客户端包含断开重连模块)的文章就介绍到这了,更多相关c#中mqtt服务器与客户端通讯内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论