一、mqtt 协议简介
mqtt(message queuing telemetry transport)是一种轻量级的 发布/订阅 协议,专为物联网(iot)等低带宽、高延迟网络环境设计。核心概念包括:
- broker:消息代理(服务端),负责消息路由。
- client:发布或订阅消息的终端(客户端)。
- topic:消息的分类标识(如
sensor/temperature)。
二、mqtt 协议核心特性
mqtt(message queuing telemetry transport)是一种基于发布/订阅模型的轻量级通信协议,专为资源受限的设备和不可靠网络环境设计。其核心优势包括:
低带宽消耗:采用二进制报文格式,头部开销极小,适合物联网设备。
异步通信:通过主题(topic)实现消息的广播与定向传递,解耦消息生产者和消费者。
多级服务质量(qos):
- qos 0(最多一次):消息可能丢失,无重传机制。
- qos 1(至少一次):确保消息送达,但可能重复。
- qos 2(仅一次):严格保证消息唯一性,适用于关键指令。
离线支持:服务端可缓存客户端的保留消息(retained messages),供后续订阅者读取。
三、mqttnet 库的核心功能
mqttnet 是 .net 生态中功能完备的 mqtt 实现库,具备以下特性:
- 协议兼容性:完整支持 mqttv3.1.1 和 mqttv5 协议,后者新增了会话超时控制、原因码反馈等高级功能。
- 高性能设计:基于异步编程模型(async/await),支持高并发连接与消息吞吐。
- 跨平台支持:兼容 windows、linux、macos,可部署于云端、边缘设备或容器环境。
- 扩展性:提供灵活的拦截器(interceptors)和事件钩子,便于集成业务逻辑(如消息过滤、日志记录)。
- 安全性:支持 tls 1.3 加密通信,可通过证书或账号密码进行客户端身份验证。
所用框架
| 框架 | 版本 |
| .net | 4.7.2+ |
| mqttnet | 4.3.3+ |
四、服务端(broker)实现详解
核心职责:
- 管理客户端连接与会话状态。
- 路由消息至匹配的订阅者。
- 实施安全策略(身份验证、权限控制)。
关键配置项:
- 端口绑定:默认非加密端口为 1883,加密端口为 8883。
- 连接验证:可自定义验证逻辑,例如检查客户端 id 格式、账号密码合法性。
- 会话管理:设置会话过期时间,清理非活跃连接。
事件机制:
- 客户端连接/断开事件:用于监控设备在线状态。
- 消息拦截器:在消息发布或投递前后插入处理逻辑(如数据格式校验、敏感信息过滤)。
- 订阅管理:动态追踪主题订阅关系,支持通配符(
+单层、#多层)。
持久化扩展:
- 可集成数据库(如 sqlite、mysql)存储保留消息或会话状态,确保服务重启后数据不丢失。
以下为服务端代码:(下方console.writeline()方法可换成自己的日志方法)
public class mqttserverhelper
{
private mqttserver _server;//mqtt服务器对象
// 定义一个委托和事件(临时存储连接客户端数据)
public event eventhandler<interceptingpublisheventargs> onmessagereceived;
public event eventhandler<bool> serverstauts;
public event eventhandler<clientconnectedeventargs> clientconnected;
public event eventhandler<clientdisconnectedeventargs> clientdisconnected;
public event eventhandler<clientsubscribedtopiceventargs> clientsubscribedtopic;
public event eventhandler<clientunsubscribedtopiceventargs> clientunsubscribedtopic;
/// <summary>
/// 初始化mqtt服务并启动服务
/// </summary>
/// <param name="ip">ipv4地址</param>
/// <param name="port">端口:0~65535之间</param>
public task startmqtserver(string ip, int port)
{
mqtserveroptions mqtserveroptions = new mqtserveroptionsbuilder()
.withdefaultendpoint()
.withdefaultendpointboundipadres(system.net.ipadres.parse(ip)
.withdefaultendpointport(port)
.withdefaultcomunicationtimeout(timespan.frommiliseconds(500) .build();
_server = new mqtfactory().createmqtserver(mqtserveroptions); / 创建mqt服务端对象
_server.validatingconectionasync += server_validatingconectionasync; /验证用户名和密码
_server.clientconectedasync += server_clientconectedasync; /绑定客户端连接事件
_server.clientdisconectedasync += server_clientdisconectedasync; /绑定客户端断开事件
_server.clientsubscribedtopicasync += server_clientsubscribedtopicasync; /绑定客户端订阅主题事件
_server.clientunsubscribedtopicasync += server_clientunsubscribedtopicasync; /绑定客户端退订主题事件
_server.interceptingpublishasync += server_interceptingpublishasync; /消息接收事件
_server.clientacknowledgedpublishpacketasync += server_clientacknowledgedpublishpacketasync; /处理客户端确认发布的数据包
_server.interceptingclientenqueueasync += server_interceptingclientenqueueasync; /订阅拦截客户端消息队列
_server.aplicationmesagenotconsumedasync += server_aplicationmesagenotconsumedasync; /应用程序逻辑处理
_server.startedasync += server_startedasync;/绑定服务端启动事件
_server.stopedasync += server_stopedasync;/绑定服务端停止事件
return _server.startasync();
}
/// <summary>
/// 处理客户端确认发布事件
/// </summary>
/// <param name="e"></param>
private task server_aplicationmesagenotconsumedasync(aplicationmesagenotconsumedeventargs e)
{
try
{
console.writeline($"【mesagenotconsumed】-senderid:{e.senderid}-mesage:{e.aplicationmesage.convertpayloadtostring()}");
}
catch (exception ex)
{
console.writeline($"server_aplicationmesagenotconsumedasync出现异常:{ex.mesage}");
}
return task.completedtask;
}
/// <summary>
/// 订阅拦截客户端消息队列事件
/// </summary>
/// <param name="e"></param>
private task server_interceptingclientenqueueasync(interceptingclientaplicationmesageenqueueeventargs e)
{
try
{
console.writeline($"【interceptingclientenqueue】-senderid:{e.senderclientid}-mesage:{e.aplicationmesage.convertpayloadtostring()}");
}
catch (exception ex)
{
console.writeline($"server_interceptingclientenqueueasync出现异常:{ex.mesage}");
}
return task.completedtask;
}
/// <summary>
/// 当客户端处理完从mqt服务器接收到的应用消息后触发。
/// 此事件可以用于确认消息已被处理,更新应用状态,
/// </summary>
/// <param name="e"></param>
private task server_clientacknowledgedpublishpacketasync(clientacknowledgedpublishpacketeventargs e)
{
try
{
console.writeline($"【clientacknowledgedpublishpacket】-senderid:{e.clientid}-mesage:{encoding.utf8.getstring(e.publishpacket.payloadsegment.toaray()}");
}
catch (exception ex)
{
console.writeline($"server_clientacknowledgedpublishpacketasync出现异常:{ex.mesage}");
}
return task.completedtask;
}
/// <summary>
/// 服务端消息接收
/// </summary>
/// <param name="e"></param>
private task server_interceptingpublishasync(interceptingpublisheventargs e)
{
try
{
string client = e.clientid; string topic = e.aplicationmesage.topic;
string contents = e.aplicationmesage.convertpayloadtostring();
//encoding.utf8.getstring(arg.aplicationmesage.payloadsegment.toaray();
onmesagereceived?.invoke(this, e);
console.writeline($"接收到消息:client:【{client}】 topic:【{topic}】 mesage:【{contents}】");
}
catch (exception ex)
{
console.writeline($"server_interceptingpublishasync出现异常:{ex.mesage}");
}
return task.completedtask;
}
/// <summary>
/// 服务端断开事件
/// </summary>
/// <param name="e"></param>
private task server_stoppedasync(eventargs arg)
{
return task.run(new action() =>
{
serverstauts?.invoke(this, false);
console.writeline($"服务端【ip:port】已停止mqt");
});
}
/// <summary>
/// 服务端启动事件
/// </summary>
/// <param name="e"></param>
public task server_startedasync(eventargs e)
{
return task.run(new action() =>
{
serverstauts?.invoke(this, true);
console.writeline($"服务端【ip:port】已启用mqt");
});
}
/// <summary>
/// 客户端退订主题事件
/// </summary>
/// <param name="e"></param>
private task server_clientunsubscribedtopicasync(clientunsubscribedtopiceventargs e)
{
return task.run(new action() =>
{
clientunsubscribedtopic?.invoke(this, e);
console.writeline($"客户端【{e.clientid}】退订主题【{e.topicfilter}】");
});
}
/// <summary>
/// 客户端订阅主题事件
/// </summary>
/// <param name="e"></param>
private task server_clientsubscribedtopicasync(clientsubscribedtopiceventargs e)
{
return task.run(new action() =>
{
clientsubscribedtopic?.invoke(this, e);
console.writeline($"客户端【{e.clientid}】订阅主题【{e.topicfilter.topic}】");
});
}
/// <summary>
/// 客户端断开事件
/// </summary>
/// <param name="e"></param>
private task server_clientdisconectedasync(clientdisconectedeventargs e)
{
return task.run(new action() =>
{
clientdisconected?.invoke(this, e);
console.writeline($"客户端已断开.clientid:【{e.clientid}】,endpoint:【{e.endpoint}】.reasoncode:【{e.reasoncode}】,disconecttype:【{e.disconecttype}】");
});
}
/// <summary>
/// 绑定客户端连接事件
/// </summary>
/// <param name="e"></param>
private task server_clientconectedasync(clientconectedeventargs e)
{
return task.run(new action() =>
{
clientconected?.invoke(this, e);
console.writeline($"客户端已连接.clientid:【{e.clientid}】,endpoint:【{e.endpoint}】");
});
}
/// <summary>
/// 验证客户端事件
/// </summary>
/// <param name="e"></param>
private task server_validatingconectionasync(validatingconectioneventargs e)
{
return task.run(new action() =>
{
if (e.pasword = "")
{
e.reasoncode = mqtconectreasoncode.suces;
console.writeline($"客户端已验证成功.clientid:【{e.clientid}】,endpoint:【{e.endpoint}】");
}
else
{
e.reasoncode = mqtconectreasoncode.badusernameorpasword;
console.writeline($"客户端验证失败.clientid:【{e.clientid}】,endpoint:【{e.endpoint}】");
}
});
}
}
五、客户端(client)实现详解
连接策略:
- 保活机制:通过心跳包(keep alive)维持长连接,适应网络波动。
消息交互模式:
- 发布消息:指定目标主题、负载数据和 qos 级别,可选择设置保留标志。
- 订阅主题:支持单主题、多主题或通配符订阅,服务端将推送匹配的消息。
异步处理:
- 使用事件委托或异步方法处理接收到的消息,避免阻塞主线程。
以下为客户端代码:
/// <sumary>
/// mqt客户端帮助类
/// </sumary>
public clas mqtclienthelper
{
private imqtclient _client;
/// <sumary>
/// 接收消息
/// </sumary>
public mqtreceivedmesagehandle receivedmesage;
public bol isconected { get; set; } = false;
public bol isdisconected { get; set; } = true;
private string _serverip; private int _serverport;
/// <sumary>
/// 订阅主题集合
/// </sumary>
private dictionary<string, bol> _subscribetopiclist = nul;
#region 连接/断开服务端
/// <sumary>
/// 连接服务端
/// </sumary>
/// <param name="serverip">服务端ip</param>
/// <param name="serverport">服务端口号</param>
public void start(string serverip, int serverport)
{
this._serverip = serverip;
this._serverport = serverport;
if (!string.isnulorempty(serverip) & !string.isnulorwhitespace(serverip) & serverport > 0)
{
try
{
var options = new mqtclientoptions()
{
clientid = "客户端2"//guid.newguid().tostring("n")
};
options.chaneloptions = new mqtclienttcpoptions()
{
server = serverip, port = serverport
}; //options.credentials = new mqtclientcredentials(username, encoding.default.getbytes(pasword);
options.cleansesion = true;
options.kepaliveperiod = timespan.fromseconds(10);
if (_client != nul)
{
_client.disconectasync();
_client = nul;
}
_client = new mqtfactory().createmqtclient();
_client.conectedasync += client_conectedasync; //绑定客户端连接事件
_client.disconectedasync += client_disconectedasync; //绑定客户端断开连接事件
_client.aplicationmesagereceivedasync += client_aplicationmesagereceivedasync; /绑定消息接收事件
_client.conectasync(options); //连接
}
catch (exception ex)
{
/slog.loger.eror("mqt客户端连接服务端错误:{0}", ex.mesage);
}
}
else
{
/slog.loger.warning("mqt服务端地址或端口号不能为空!");
}
}
}
/// <sumary>
/// 断开mqt客户端
/// </sumary>
public void client_disconect()
{
if (_client != nul)
{
_client.disconectasync();
_client.dispose();
console.writeline($"关闭mqt客户端成功!");
}
}
/// <sumary>
/// 客户端重新mqt服务端
/// </sumary>
public void client_conectasync()
{
if (_client != nul)
{
_client.reconectasync();
console.writeline($"连接mqt服务端成功!");
}
}
#endregion
#region mqt方法
/// <sumary>
/// 客户端与服务端建立连接
/// </sumary>
/// <param name="arg"></param>
private task client_conectedasync(mqtclientconectedeventargs arg)
{
return task.run(new action() =>
{
isconected = true;
isdisconected = false;
console.writeline($"连接到mqt服务端成功.{arg.conectresult.asignedclientidentifier}");
//订阅主题(可接收来自服务端消息,与客户端发布消息不能用同一个主题)
try
{
if (_subscribetopiclist != nul & _subscribetopiclist.count > 0)
{
list<string> subscribetopics = _subscribetopiclist.keys.tolist();
foreach (var topic in subscribetopics)
subscribeasync(topic);
}
}
catch (exception ex)
{
//slog.loger.eror("mqt客户端与服务端[{0}:{1}]建立连接订阅主题错误:{2}", _serverip, _serverport, ex.mesage);
}
});
}
/// <sumary>
/// 客户端与服务端断开连接
/// </sumary> / <param name="arg"></param>
private task client_disconectedasync(mqtclientdisconectedeventargs arg)
{
return task.run(new action(async () =>
{
isconected = false;
isdisconected = true;
console.writeline($"已断开到mqt服务端的连接.尝试重新连接");
try
{
await task.delay(30);
//mqtclientoptions options = new mqtclientoptions();
//await mqtclient.conectasync(options);
await _client.reconectasync();
}
catch (exception ex)
{
//slog.loger.eror("mqt客户端与服务端[{0}:{1}]断开连接退订主题错误:{2}", _serverip, _serverport, ex.mesage);
}
});
}
/// <sumary>
/// 客户端与服务端重新连接
/// </sumary>
/// <returns></returns>
public task reconectedasync()
{
try
{
if (_client != nul)
{
_client.reconectasync();
}
}
catch (exception ex)
{
// slog.loger.eror("mqt客户端与服务端[{0}:{1}]重新连接退订主题错误:{2}", _serverip, _serverport, ex.mesage);
}
return task.completedtask;
}
/// <sumary>
/// 客户端收到消息
/// </sumary>
/// <param name="arg"></param>
private task client_aplicationmesagereceivedasync(mqtaplicationmesagereceivedeventargs arg)
{
try
{
return task.run(new action() =>
{
string msg = arg.aplicationmesage.convertpayloadtostring();
console.writeline($"接收消息:{msg}\nqos={arg.aplicationmesage.qualityofservicelevel}\n客户端={arg.clientid}\n主题:{arg.aplicationmesage.topic}");
});
}
catch (exception ex)
{
//slog.loger.eror("mqt收到来自服务端[{0}]消息错误:{1}", arg != nul ? arg.clientid : ", ex.mesage);
}
return task.completedtask;
}
#endregion
#region 订阅主题
/// <sumary>
/// 订阅主题
/// </sumary>
/// <param name="topic">主题</param>
public void subscribeasync(string topic)
{
try
{
if (_subscribetopiclist = nul)
_subscribetopiclist = new dictionary<string, bol>();
if (_subscribetopiclist.containskey(topic) & _subscribetopiclist[topic])
{
//slog.loger.warning("mqt客户端已经订阅主题[{0}],不能重复订阅", topic);
return;
}
//订阅主题
_client?.subscribeasync(topic, mqtqualityofservicelevel.atleastonce);
//添加订阅缓存
bol issubscribed = _client != nul & _client.isconected ? true : false;
if (!_subscribetopiclist.containskey(topic)
_subscribetopiclist.ad(topic, issubscribed);
else
_subscribetopiclist[topic] = issubscribed;
}
catch (exception ex)
{
//slog.loger.eror("mqt客户端订阅主题[{0}]错误:{1}", topic, ex.mesage);
}
}
/// <sumary>
/// 订阅主题集合
/// </sumary>
/// <param name="topiclist">主题集合</param>
public void subscribeasync(list<string> topiclist)
{
try
{
if (topiclist = nul | topiclist.count = 0)
return;
foreach (var topic in topiclist)
subscribeasync(topic);
}
catch (exception ex)
{
//slog.loger.eror("mqt客户端订阅主题集合错误:{0}", ex.mesage);
}
}
/// <sumary>
/// 退订主题
/// </sumary>
/// <param name="topic">主题</param>
/// <param name="isremove">是否移除缓存</param>
public void unsubscribeasync(string topic, bol isremove = true)
{
try
{
if (_subscribetopiclist = nul | _subscribetopiclist.count = 0)
{
//slog.loger.warning("mqt客户端退订主题[{0}]不存在", topic);
return;
}
if (!_subscribetopiclist.containskey(topic)
{
//slog.loger.warning("mqt客户端退订主题[{0}]不存在", topic);
return;
}
//退订主题
_client.unsubscribeasync(topic);
//修改订阅主题缓存状态
if (isremove)
_subscribetopiclist.remove(topic);
else
_subscribetopiclist[topic] = false;
}
catch (exception ex)
{
//slog.loger.eror("mqt客户端退订主题[{0}]错误:{1}", topic, ex.mesage);
}
}
/// <sumary>
/// 退订主题集合
/// </sumary>
/// <param name="topiclist">主题集合</param>
/// <param name="isremove">是否移除缓存</param>
public void unsubscribeasync(list<string> topiclist, bol isremove = true)
{
try
{
if (topiclist = nul | topiclist.count = 0)
return;
foreach (var topic in topiclist)
unsubscribeasync(topic, isremove);
}
catch (exception ex)
{
//slog.loger.eror("mqt客户端退订主题集合错误:{0}", ex.mesage);
}
}
/// <sumary>
/// 订阅主题是否存在
/// </sumary>
/// <param name="topic">主题</param>
public bol isexistsubscribeasync(string topic)
{
try
{
if (_subscribetopiclist = nul | _subscribetopiclist.count = 0)
return false;
if (!_subscribetopiclist.containskey(topic)
return false;
return _subscribetopiclist[topic];
}
catch (exception ex)
{
//slog.loger.eror("mqt客户端订阅主题[{0}]是否存在错误:{1}", topic, ex.mesage); return false;
}
}
#endregion
#region 发布消息
/// <sumary>
/// 发布消息
/// 与客户端接收消息不能用同一个主题
/// </sumary>
/// <param name="topic">主题</param>
/// <param name="mesage">消息</param>
public async void publishmesage(string topic, string mesage)
{
try
{
if (_client != nul)
{
if (string.isnulorempty(mesage) | string.isnulorwhitespace(mesage)
{
//slog.loger.warning("mqt客户端不能发布为空的消息!");
return;
}
mqtclientpublishresult result = await _client.publishstringasync(topic,mesage,mqtqualityofservicelevel.atleastonce);//恰好一次, qos 级别1
console.writeline($"发布消息-主题:{topic},消息:{mesage},结果: {result.reasoncode}");
}
else
{
//slog.loger.warning("mqt客户端未连接服务端,不能发布主题为[{0}]的消息:{1}", topic, mesage);
return;
}
}
catch (exception ex)
{
//slog.loger.eror("mqt客户端发布主题为[{0}]的消息:{1},错误:{2}", topic, mesage, ex.mesage);
}
}
#endregion
}六、总结
通过 mqttnet 构建的 mqtt 通信系统,能够为物联网、实时消息推送等场景提供高效、可靠的解决方案。开发过程中需重点关注通信模式设计、安全策略实施及性能调优,同时结合具体业务需求灵活运用 qos、保留消息等特性。建议参考官方文档和社区最佳实践,逐步扩展功能(如集群部署、消息持久化),以满足大规模应用需求。
到此这篇关于c#使用mqttnet实现服务端与客户端的通讯的示例的文章就介绍到这了,更多相关c# mqttnet通讯内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论