当前位置: 代码网 > it编程>编程语言>Java > C#MQTT编程06--MQTT服务器和客户端(winform版)

C#MQTT编程06--MQTT服务器和客户端(winform版)

2024年08月02日 Java 我要评论
介绍完基础理论部分,下面在Windows平台上搭建一个简单的MQTT应用,进行简单的应用,整体架构如下图所示;消息模型:运用MQTT协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景,如下图所示前面介绍过,MQTT可以运行在几乎所有的平台,windows,linux什么的都可以,各种语言都有实现MQTT的组件,.net也好,Java也好,都有封装好的mqtt服务器和客户端组件或插件来实现,本系列是在.net平台下实现mqtt的服务器和客户端。

1、前言

介绍完基础理论部分,下面在windows平台上搭建一个简单的mqtt应用,进行简单的应用,整体架构如下图所示;

消息模型:

运用mqtt协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景,如下图所示

前面介绍过,mqtt可以运行在几乎所有的平台,windows,linux什么的都可以,各种语言都有实现mqtt的组件,.net也好,java也好,都有封装好的mqtt服务器和客户端组件或插件来实现,本系列是在.net平台下实现mqtt的服务器和客户端。

常见的mqtt服务器包括eclipse mosquitto、emq x、hivemq、rabbitmq、mqttnet等,本系列文章都是基于.net平台的mqtt服务通信,开发环境vs2022,.net framework4.8。

2、服务器搭建

1、创建项目方案

 

 2、添加库引用

 

 

3、ui布局

 

 

4、控件代码

“启动”按钮

停止按钮

 窗体加载

全部完整代码 

注意这里面用到了多线程及任务task,委托action,异步async及await的技术,在mqtt中必须使用这些技术,否则界面会卡死。

using mqttnet.client.receiving;
using mqttnet.server;
using mqttnet;
using system;
using system.collections.generic;
using system.componentmodel;
using system.data;
using system.drawing;
using system.linq;
using system.text;
using system.threading.tasks;
using system.windows.forms;
using mqttnet.protocol;

namespace mqttnetserverforms
{
    public partial class form1 : form
    {

        private imqttserver server;//mqtt服务器对象
        list<topicitem> topics = new list<topicitem>();

        public form1()
        {
            initializecomponent();
        }

        private void form1_load(object sender, eventargs e)
        {
            //创建服务器对象
            server = new mqttfactory().createmqttserver();
            server.applicationmessagereceivedhandler = new mqttapplicationmessagereceivedhandlerdelegate(new action<mqttapplicationmessagereceivedeventargs>(server_applicationmessagereceived));//绑定消息接收事件
            server.clientconnectedhandler = new mqttserverclientconnectedhandlerdelegate(new action<mqttserverclientconnectedeventargs>(server_clientconnected));//绑定客户端连接事件
            server.clientdisconnectedhandler = new mqttserverclientdisconnectedhandlerdelegate(new action<mqttserverclientdisconnectedeventargs>(server_clientdisconnected));//绑定客户端断开事件
            server.clientsubscribedtopichandler = new mqttserverclientsubscribedhandlerdelegate(new action<mqttserverclientsubscribedtopiceventargs>(server_clientsubscribedtopic));//绑定客户端订阅主题事件
            server.clientunsubscribedtopichandler = new mqttserverclientunsubscribedtopichandlerdelegate(new action<mqttserverclientunsubscribedtopiceventargs>(server_clientunsubscribedtopic));//绑定客户端退订主题事件
            server.startedhandler = new mqttserverstartedhandlerdelegate(new action<eventargs>(server_started));//绑定服务端启动事件
            server.stoppedhandler = new mqttserverstoppedhandlerdelegate(new action<eventargs>(server_stopped));//绑定服务端停止事件
        }

        /// <summary>
        /// 绑定消息接收事件
        /// </summary>
        /// <param name="e"></param>
        private void server_applicationmessagereceived(mqttapplicationmessagereceivedeventargs e)
        {
            string msg = e.applicationmessage.convertpayloadtostring();
            writelog(">>> 收到消息:" + msg + ",qos =" + e.applicationmessage.qualityofservicelevel + ",客户端=" + e.clientid + ",主题:" + e.applicationmessage.topic);
        }

        /// <summary>
        /// 绑定客户端连接事件
        /// </summary>
        /// <param name="e"></param>
        private void server_clientconnected(mqttserverclientconnectedeventargs e)
        {
            task.run(new action(() =>
            {
                lbclients.begininvoke(new action(() =>
                {
                    lbclients.items.add(e.clientid);
                }));
            }));
            writelog(">>> 客户端" + e.clientid + "连接");
        }

        /// <summary>
        /// 绑定客户端断开事件
        /// </summary>
        /// <param name="e"></param>
        private void server_clientdisconnected(mqttserverclientdisconnectedeventargs e)
        {
            task.run(new action(() =>
            {
                lbclients.begininvoke(new action(() =>
                {
                    lbclients.items.remove(e.clientid);
                }));
            }));
            writelog(">>> 客户端" + e.clientid + "断开");
        }

        /// <summary>
        /// 绑定客户端订阅主题事件
        /// </summary>
        /// <param name="e"></param>
        private void server_clientsubscribedtopic(mqttserverclientsubscribedtopiceventargs e)
        {
            task.run(new action(() =>
            {
                var topic = topics.firstordefault(t => t.topic == e.topicfilter.topic);
                if (topic == null)
                {
                    topic = new topicitem { topic = e.topicfilter.topic, count = 0 };
                    topics.add(topic);
                }
                if (!topic.clients.exists(c => c == e.clientid))
                {
                    topic.clients.add(e.clientid);
                    topic.count++;
                }

                lvtopic.invoke(new action(() =>
                {
                    this.lvtopic.items.clear();
                }));

                foreach (var item in this.topics)
                {

                    lvtopic.invoke(new action(() =>
                    {
                        this.lvtopic.items.add($"{item.topic}:{item.count}");
                    }));
                }
            }));
            writelog(">>> 客户端" + e.clientid + "订阅主题" + e.topicfilter.topic);
        }

        /// <summary>
        /// 绑定客户端退订主题事件
        /// </summary>
        /// <param name="e"></param>
        private void server_clientunsubscribedtopic(mqttserverclientunsubscribedtopiceventargs e)
        {
            task.run(new action(() =>
            {
                var topic = topics.firstordefault(t => t.topic == e.topicfilter);
                if (topic != null)
                {
                    topic.count--;
                    topic.clients.remove(e.clientid);
                }

                this.lvtopic.items.clear();
                foreach (var item in this.topics)
                {
                    this.lvtopic.items.add($"{item.topic}:{item.count}");
                }
            }));

            writelog(">>> 客户端" + e.clientid + "退订主题" + e.topicfilter);
        }

        /// <summary>
        /// 绑定服务端启动事件
        /// </summary>
        /// <param name="e"></param>
        private void server_started(eventargs e)
        {
            writelog(">>> 服务端已启动!");
        }

        /// <summary>
        /// 绑定服务端停止事件
        /// </summary>
        /// <param name="e"></param>
        private void server_stopped(eventargs e)
        {
            writelog(">>> 服务端已停止!");
        }

        /// <summary>
        /// 显示日志
        /// </summary>
        /// <param name="message"></param>
        public void writelog(string message)
        {
            if (txtmsg.invokerequired)
            {
                txtmsg.invoke(new action(() =>
                {
                    txtmsg.text = "";
                    txtmsg.text = (message + "\r");
                }));
            }
            else
            {
                txtmsg.text = "";
                txtmsg.text = (message + "\r");
            }
        }

        /// <summary>
        /// 启动
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        [obsolete]
        private async void btnstart_click(object sender, eventargs e)
        {
            var optionbuilder = new mqttserveroptionsbuilder()
          .withdefaultendpointboundipaddress(system.net.ipaddress.parse(this.txtip.text))
          .withdefaultendpointport(int.parse(this.txtport.text))
          .withdefaultcommunicationtimeout(timespan.frommilliseconds(5000))
          .withconnectionvalidator(t =>
          {
              string un = "", pwd = "";
              un = this.txtuname.text;
              pwd = this.txtupwd.text;
              if (t.username != un || t.password != pwd)
              {
                  t.returncode = mqttconnectreturncode.connectionrefusedbadusernameorpassword;
              }
              else
              {
                  t.returncode = mqttconnectreturncode.connectionaccepted;
              }
          });
            var option = optionbuilder.build();
            //启动
            await server.startasync(option);
            writelog(">>> 服务器启动成功");
        }

        /// <summary>
        /// 停止 
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void btnstop_click(object sender, eventargs e)
        {
            if (server != null)
            {
                server.stopasync();
            }
        }
    }
}

 注意这个端口,帐号,密码可以自己决定,ip地址也是。

 注意这里面的代码,服务器上必须注册绑定实现的几个事件:消息接收事件 ,客户端连接事件,客户端断开事件,客户端订阅主题事件,客户端退订主题事件,服务端启动事件,服务端停止事件

 server.applicationmessagereceivedhandler = new mqttapplicationmessagereceivedhandlerdelegate(new action<mqttapplicationmessagereceivedeventargs>(server_applicationmessagereceived));//绑定消息接收事件
 server.clientconnectedhandler = new mqttserverclientconnectedhandlerdelegate(new action<mqttserverclientconnectedeventargs>(server_clientconnected));//绑定客户端连接事件
 server.clientdisconnectedhandler = new mqttserverclientdisconnectedhandlerdelegate(new action<mqttserverclientdisconnectedeventargs>(server_clientdisconnected));//绑定客户端断开事件
 server.clientsubscribedtopichandler = new mqttserverclientsubscribedhandlerdelegate(new action<mqttserverclientsubscribedtopiceventargs>(server_clientsubscribedtopic));//绑定客户端订阅主题事件
 server.clientunsubscribedtopichandler = new mqttserverclientunsubscribedtopichandlerdelegate(new action<mqttserverclientunsubscribedtopiceventargs>(server_clientunsubscribedtopic));//绑定客户端退订主题事件
 server.startedhandler = new mqttserverstartedhandlerdelegate(new action<eventargs>(server_started));//绑定服务端启动事件
 server.stoppedhandler = new mqttserverstoppedhandlerdelegate(new action<eventargs>(server_stopped));//绑定服务端停止事件

启动测试服务器

 启动成功。

3、客户端创建

 1、添加项目

2、添加库引用 

注意这里添加的与服务器不一样,别混错了

 

3、ui布局

 布局使用的是常规的label,textbox,button 

4、控件代码

连接代码

订阅代码

 

发布代码

 完整代码

using mqttnet.client.options;
using mqttnet.client;
using mqttnet.extensions.managedclient;
using system;
using system.collections.generic;
using system.componentmodel;
using system.data;
using system.drawing;
using system.linq;
using system.text;
using system.threading.tasks;
using system.windows.forms;
using mqttnet;

namespace mqttnetclientforms
{
    public partial class form1 : form
    {
        private imanagedmqttclient mqttclient;//客户端mqtt对象

        public form1()
        {
            initializecomponent();
        }

        /// <summary>
        /// 连接
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnconn_click(object sender, eventargs e)
        {
            var mqttclientoptions = new mqttclientoptionsbuilder()
              .withclientid(this.txtid.text)
              .withtcpserver(this.txtip.text, int.parse(this.txtport.text))
              .withcredentials(this.txtname.text, this.txtupwd.text);

            var options = new managedmqttclientoptionsbuilder()
                        .withautoreconnectdelay(timespan.fromseconds(5))
                        .withclientoptions(mqttclientoptions.build())
                        .build();
            //开启
            await mqttclient.startasync(options);
        }

        /// <summary>
        /// 断开
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnclose_click(object sender, eventargs e)
        {
            if (mqttclient != null)
            {
                if (mqttclient.isstarted)
                {
                    await mqttclient.stopasync();
                }
                mqttclient.dispose();
            }
        }

        private void form1_load(object sender, eventargs e)
        {
            var factory = new mqttfactory();
            mqttclient = factory.createmanagedmqttclient();//创建客户端对象

            //绑定断开事件
            mqttclient.usedisconnectedhandler(async ee =>
            {
                writelog("与服务器之间的连接断开了,正在尝试重新连接");
                // 等待 5s 时间
                await task.delay(timespan.fromseconds(5));
                try
                {
                    mqttclient.useconnectedhandler(tt =>
                    {
                        writelog(">>> 连接到服务成功");
                    });
                }
                catch (exception ex)
                {
                    writelog($"重新连接服务器失败:{ex}");
                }
            });

            //绑定接收事件
            mqttclient.useapplicationmessagereceivedhandler(aa =>
            {
                try
                {
                    string msg = aa.applicationmessage.convertpayloadtostring();
                    writelog(">>> 消息:" + msg + ",qos =" + aa.applicationmessage.qualityofservicelevel + ",客户端=" + aa.clientid + ",主题:" + aa.applicationmessage.topic);
                }
                catch (exception ex)
                {
                    writelog($"+ 消息 = " + ex.message);
                }
            });

            //绑定连接事件
            mqttclient.useconnectedhandler(ee =>
            {
                writelog(">>> 连接到服务成功");
            });
        }

        /// <summary>
        /// 显示日志
        /// </summary>
        /// <param name="message"></param>
        private void writelog(string message)
        {
            if (txtmsg.invokerequired)
            {
                txtmsg.invoke(new action(() =>
                {
                    txtmsg.text = (message);
                }));
            }
            else
            {
                txtmsg.text = (message);
            }
        }

        /// <summary>
        /// 订阅
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>

        [obsolete]
        private async void btnsub_click(object sender, eventargs e)
        {
            if (string.isnullorwhitespace(this.txttopic.text))
            {
                writelog(">>> 请输入主题");
                return;
            }
            //在 mqtt 中有三种 qos 级别: 
            //at most once(0) 最多一次
            //at least once(1) 至少一次
            //exactly once(2) 恰好一次
            //await mqttclient.subscribeasync(new topicfilterbuilder().withtopic(this.tbtopic.text).withatmostonceqos().build());//最多一次, qos 级别0
            await mqttclient.subscribeasync(new topicfilterbuilder().withtopic(this.txttopic.text).withatleastonceqos().build());//恰好一次, qos 级别1 
            writelog($">>> 成功订阅");
        }

        /// <summary>
        /// 发布
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnpub_click(object sender, eventargs e)
        {
            if (string.isnullorwhitespace(this.txttopik.text))
            {
                writelog(">>> 请输入主题");
                return;
            }
            var result = await mqttclient.publishasync(
                this.txttopik.text,
                this.txtcontent.text,
                mqttnet.protocol.mqttqualityofservicelevel.atleastonce);//恰好一次, qos 级别1   
            writelog($">>> 主题:{this.txttopik.text},消息:{this.txtcontent.text},结果: {result.reasoncode}");
        }
    }
}

4、运行测试

生成编译解决方案,成功后开始测试

1、启动服务器

2、启动客户端

找到生成的客户端debug目录下的.exe文件

3、测试连接 

连接成功,服务器看到客户端上线了

4、测试订阅

再运行一个客户端,连接服务器

5、测试发布

 c1向cced主题发布一个消息,结果是c1,c2都收到了消息

同样,c2发布一个消息,c1,c2都收到了消息

 6、测试下线

c1关闭,服务器马上知道了

 5、小结

基于mqttnet的组件搭建的mqtt服务器和客户端通信成功,发布和订阅都ko,ko,ko。

讲解不易,分析不易,原创不易,整理不易,伙伴们动动你的金手指,你的支持是我最大的动力。

 

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com