netty认知
netty是一款基于nio(nonblocking i/o,非阻塞io)开发的网络通信框架,相比传统socket,在并发性方面有着很大的提升。关于nio,bio,aio之间的区别,可以参考这篇博客:java中aio、bio、nio应用场景及区别
mqtt服务端实现
首先我们启动一个tcp服务,这里我用到了redis与rabbitmq,主要是与分布式web平台之间好对接
@component public class applicationeventlistener implements commandlinerunner { @value("${spring.application.name}") private string nodename; @value("${gnss.mqttserver.tcpport}") private int tcpport; @override public void run(string... args) throws exception { //启动tcp服务 starttcpserver(); //清除redis所有此节点的在线终端 redisservice redisservice = springbeanservice.getbean(redisservice.class); redisservice.deleteallonlineterminals(nodename); //将所有此节点的终端设置为离线 rabbitmessagesender messagesender = springbeanservice.getbean(rabbitmessagesender.class); messagesender.noticealloffline(nodename); } /** * 启动tcp服务 * * @throws exception */ private void starttcpserver() throws exception { //计数器,必须等到所有服务启动成功才能进行后续的操作 final countdownlatch countdownlatch = new countdownlatch(1); //启动tcp服务 tcpserver tcpserver = new tcpserver(tcpport, protocolenum.mqttcommon, countdownlatch); tcpserver.start(); //等待启动完成 countdownlatch.await(); } }
接下来我们编写一个tcpserver类实现tcp服务
@slf4j public class tcpserver extends thread{ private int port; private protocolenum protocoltype; private eventloopgroup bossgroup; private eventloopgroup workergroup; private serverbootstrap serverbootstrap = new serverbootstrap(); private countdownlatch countdownlatch; public tcpserver(int port, protocolenum protocoltype, countdownlatch countdownlatch) { this.port = port; this.protocoltype = protocoltype; this.countdownlatch = countdownlatch; bossgroup = new nioeventloopgroup(1); workergroup = springbeanservice.getbean("workergroup", eventloopgroup.class); final eventexecutorgroup executorgroup = springbeanservice.getbean("executorgroup", eventexecutorgroup.class); serverbootstrap.group(bossgroup, workergroup) .channel(nioserversocketchannel.class) .option(channeloption.so_backlog, 1024) .childoption(channeloption.so_keepalive, true) .childoption(channeloption.tcp_nodelay, true) .childhandler(new channelinitializer<socketchannel>() { @override protected void initchannel(socketchannel ch) throws exception { ch.pipeline().addlast(new idlestatehandler(mqttconstant.reader_idle_time, 0, 0, timeunit.seconds)); ch.pipeline().addlast("encoder", mqttencoder.instance); ch.pipeline().addlast("decoder", new mqttdecoder()); ch.pipeline().addlast(executorgroup, mqttbusinesshandler.instance); } }); } @override public void run() { bind(); } /** * 绑定端口启动服务 */ private void bind() { serverbootstrap.bind(port).addlistener(future -> { if (future.issuccess()) { log.info("{} mqtt服务器启动,端口:{}", protocoltype, port); countdownlatch.countdown(); } else { log.error("{} mqtt服务器启动失败,端口:{}", protocoltype, port, future.cause()); system.exit(-1); } }); } /** * 关闭服务端 */ public void shutdown() { workergroup.shutdowngracefully(); bossgroup.shutdowngracefully(); log.info("{} tcp服务器关闭,端口:{}", protocoltype, port); } }
编写一个解码器mqttbusinesshandler,实现对mqtt消息接收与处理
@slf4j @channelhandler.sharable public class mqttbusinesshandler extends simplechannelinboundhandler<object> { public static final mqttbusinesshandler instance = new mqttbusinesshandler(); private mqttmsgback mqttmsgback; private mqttbusinesshandler() { mqttmsgback= mqttmsgback.instance; } /** * 接收到消息后处理 * @param ctx * @param msg * @throws exception */ @override protected void channelread0(channelhandlercontext ctx, object msg) throws exception { if (null != msg) { mqttmessage mqttmessage = (mqttmessage) msg; mqttfixedheader mqttfixedheader = mqttmessage.fixedheader(); channel channel = ctx.channel(); if(mqttfixedheader.messagetype().equals(mqttmessagetype.connect)){ //在一个网络连接上,客户端只能发送一次connect报文。服务端必须将客户端发送的第二个connect报文当作协议违规处理并断开客户端的连接 //建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个connack消息 mqttmsgback.connectionack(ctx, mqttmessage); } switch (mqttfixedheader.messagetype()){ //客户端发布消息 case publish: mqttmsgback.publishack(ctx, mqttmessage); break; //发布释放 case pubrel: mqttmsgback.publishcomp(ctx, mqttmessage); break; //订阅主题 case subscribe: mqttmsgback.subscribeack(ctx, mqttmessage); break; //取消订阅主题 case unsubscribe: mqttmsgback.unsubscribeack(ctx, mqttmessage); break; //客户端发送心跳报文 case pingreq: mqttmsgback.pingresp(ctx, mqttmessage); break; //客户端主动断开连接 case disconnect: break; default: break; } } } @override public void channelinactive(channelhandlercontext ctx) throws exception { log.info("终端关闭连接,ip信息:{}", commonutil.getclientaddress(ctx)); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { ctx.close(); log.error("终端连接异常,ip信息:{}", commonutil.getclientaddress(ctx), cause); } /** * 服务端当读超时时会调用这个方法 */ @override public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception, ioexception { ctx.close(); log.error("读超时,ip信息:{}", commonutil.getclientaddress(ctx), evt); }
我们对接收到的消息进行业务处理
@slf4j public class mqttmsgback { public static final mqttmsgback instance = new mqttmsgback(); private redisservice redisservice; private rabbitmessagesender messagesender; private environment environment; private messageserviceprovider messageserviceprovider; private mqttmsgback() { redisservice = springbeanservice.getbean(redisservice.class); messagesender = springbeanservice.getbean(rabbitmessagesender.class); environment = springbeanservice.getbean(environment.class); messageserviceprovider = springbeanservice.getbean(messageserviceprovider.class); } /** * 确认连接请求 * @param ctx * @param mqttmessage */ public void connectionack (channelhandlercontext ctx, mqttmessage mqttmessage) { mqttconnectmessage mqttconnectmessage = (mqttconnectmessage) mqttmessage; mqttfixedheader mqttfixedheaderinfo = mqttconnectmessage.fixedheader(); mqttconnectvariableheader mqttconnectvariableheaderinfo = mqttconnectmessage.variableheader(); //构建返回报文, 可变报头 mqttconnackvariableheader mqttconnackvariableheaderback = new mqttconnackvariableheader(mqttconnectreturncode.connection_accepted, mqttconnectvariableheaderinfo.iscleansession()); //构建返回报文, 固定报头 mqttfixedheader mqttfixedheaderback = new mqttfixedheader(mqttmessagetype.connack,mqttfixedheaderinfo.isdup(), mqttqos.at_most_once, mqttfixedheaderinfo.isretain(), 0x02); //构建连接回复消息体 mqttconnackmessage connack = new mqttconnackmessage(mqttfixedheaderback, mqttconnackvariableheaderback); ctx.writeandflush(connack); //获取连接者的clientid string clientidentifier = mqttconnectmessage.payload().clientidentifier(); //查询终端号码有无在平台注册 terminalproto terminalinfo = redisservice.getterminalinfobyterminalnum(clientidentifier); if (terminalinfo == null) { log.error("终端登录失败,未找到终端信息,终端号:{},ip信息:{}", clientidentifier, commonutil.getclientaddress(ctx)); ctx.close(); return; } //设置节点名 terminalinfo.setnodename(environment.getproperty("spring.application.name")); //保存终端信息和消息流水号到上下文属性中 session session = new session(terminalinfo); channelhandlercontext oldctx = sessionutil.bindsession(session, ctx); if (oldctx == null) { log.info("终端登录成功,终端id:{},终端号:{},ip信息:{}", terminalinfo.getterminalstrid(), clientidentifier, commonutil.getclientaddress(ctx)); } else { log.info("终端重复登录关闭上一个连接,终端id:{},终端号:{},ip信息:{}", terminalinfo.getterminalstrid(), clientidentifier, commonutil.getclientaddress(ctx)); oldctx.close(); } //通知上线 messagesender.noticeonline(terminalinfo); log.info("终端登录成功,终端号:{},ip信息:{}", clientidentifier, commonutil.getclientaddress(ctx)); } /** * 根据qos发布确认 * @param ctx * @param mqttmessage */ public void publishack (channelhandlercontext ctx, mqttmessage mqttmessage) { mqttpublishmessage mqttpublishmessage = (mqttpublishmessage) mqttmessage; mqttfixedheader mqttfixedheaderinfo = mqttpublishmessage.fixedheader(); mqttqos qos = (mqttqos) mqttfixedheaderinfo.qoslevel(); //得到主题 string topicname = mqttpublishmessage.variableheader().topicname(); //获取消息体 bytebuf msgbodybuf = mqttpublishmessage.payload(); log.info("收到:{}", bytebufutil.hexdump(msgbodybuf)); mqttcommonmessage msg=new mqttcommonmessage(); msg.setterminalnum(sessionutil.getterminalinfo(ctx).getterminalnum()); msg.setstrmsgid(topicname); //根据主题获取对应的主题消息处理器 basemessageservice messageservice = messageserviceprovider.getmessageservice(topicname); try { object result = messageservice.process(ctx, msg, msgbodybuf); log.info("收到{}({}),终端id:{},内容:{}", messageservice.getdesc(), topicname,msg.getterminalnum(), msg.getmsgbodyitems()); } catch (exception e) { log.error("收到{}({}),消息异常,终端id:{},消息体:{}", messageservice.getdesc(), topicname,msg.getterminalnum(),bytebufutil.hexdump(msgbodybuf), e); } switch (qos) { //至多一次 case at_most_once: break; //至少一次 case at_least_once: //构建返回报文, 可变报头 mqttmessageidvariableheader mqttmessageidvariableheaderback = mqttmessageidvariableheader.from(mqttpublishmessage.variableheader().packetid()); //构建返回报文, 固定报头 mqttfixedheader mqttfixedheaderback = new mqttfixedheader(mqttmessagetype.puback,mqttfixedheaderinfo.isdup(), mqttqos.at_most_once, mqttfixedheaderinfo.isretain(), 0x02); //构建puback消息体 mqttpubackmessage puback = new mqttpubackmessage(mqttfixedheaderback, mqttmessageidvariableheaderback); log.info("qos:at_least_once:{}",puback.tostring()); ctx.writeandflush(puback); break; //刚好一次 case exactly_once: //构建返回报文,固定报头 mqttfixedheader mqttfixedheaderback2 = new mqttfixedheader(mqttmessagetype.pubrec,false, mqttqos.at_least_once,false,0x02); //构建返回报文,可变报头 mqttmessageidvariableheader mqttmessageidvariableheaderback2 = mqttmessageidvariableheader.from(mqttpublishmessage.variableheader().packetid()); mqttmessage mqttmessageback = new mqttmessage(mqttfixedheaderback2,mqttmessageidvariableheaderback2); log.info("qos:exactly_once回复:{}"+mqttmessageback.tostring()); ctx.writeandflush(mqttmessageback); break; default: break; } } /** * 发布完成 qos2 * @param ctx * @param mqttmessage */ public void publishcomp (channelhandlercontext ctx, mqttmessage mqttmessage) { mqttmessageidvariableheader messageidvariableheader = (mqttmessageidvariableheader) mqttmessage.variableheader(); //构建返回报文, 固定报头 mqttfixedheader mqttfixedheaderback = new mqttfixedheader(mqttmessagetype.pubcomp,false, mqttqos.at_most_once,false,0x02); //构建返回报文, 可变报头 mqttmessageidvariableheader mqttmessageidvariableheaderback = mqttmessageidvariableheader.from(messageidvariableheader.messageid()); mqttmessage mqttmessageback = new mqttmessage(mqttfixedheaderback,mqttmessageidvariableheaderback); log.info("发布完成回复:{}"+mqttmessageback.tostring()); ctx.writeandflush(mqttmessageback); } /** * 订阅确认 * @param ctx * @param mqttmessage */ public void subscribeack(channelhandlercontext ctx, mqttmessage mqttmessage) { mqttsubscribemessage mqttsubscribemessage = (mqttsubscribemessage) mqttmessage; mqttmessageidvariableheader messageidvariableheader = mqttsubscribemessage.variableheader(); //构建返回报文, 可变报头 mqttmessageidvariableheader variableheaderback = mqttmessageidvariableheader.from(messageidvariableheader.messageid()); set<string> topics = mqttsubscribemessage.payload().topicsubscriptions().stream().map(mqtttopicsubscription -> mqtttopicsubscription.topicname()).collect(collectors.toset()); list<integer> grantedqoslevels = new arraylist<>(topics.size()); for (int i = 0; i < topics.size(); i++) { grantedqoslevels.add(mqttsubscribemessage.payload().topicsubscriptions().get(i).qualityofservice().value()); } // 构建返回报文 有效负载 mqttsubackpayload payloadback = new mqttsubackpayload(grantedqoslevels); // 构建返回报文 固定报头 mqttfixedheader mqttfixedheaderback = new mqttfixedheader(mqttmessagetype.suback, false, mqttqos.at_most_once, false, 2+topics.size()); // 构建返回报文 订阅确认 mqttsubackmessage suback = new mqttsubackmessage(mqttfixedheaderback,variableheaderback, payloadback); log.info("订阅回复:{}", suback.tostring()); ctx.writeandflush(suback); } /** * 取消订阅确认 * @param ctx * @param mqttmessage */ public void unsubscribeack(channelhandlercontext ctx, mqttmessage mqttmessage) { mqttmessageidvariableheader messageidvariableheader = (mqttmessageidvariableheader) mqttmessage.variableheader(); // 构建返回报文 可变报头 mqttmessageidvariableheader variableheaderback = mqttmessageidvariableheader.from(messageidvariableheader.messageid()); // 构建返回报文 固定报头 mqttfixedheader mqttfixedheaderback = new mqttfixedheader(mqttmessagetype.unsuback, false, mqttqos.at_most_once, false, 2); // 构建返回报文 取消订阅确认 mqttunsubackmessage unsuback = new mqttunsubackmessage(mqttfixedheaderback,variableheaderback); log.info("取消订阅回复:{}",unsuback.tostring()); ctx.writeandflush(unsuback); } /** * 心跳响应 * @param ctx * @param mqttmessage */ public void pingresp (channelhandlercontext ctx, mqttmessage mqttmessage) { mqttfixedheader fixedheader = new mqttfixedheader(mqttmessagetype.pingresp, false, mqttqos.at_most_once, false, 0); mqttmessage mqttmessageback = new mqttmessage(fixedheader); log.info("心跳回复:{}", mqttmessageback.tostring()); ctx.writeandflush(mqttmessageback); } }
我们可以根据客户端发布消息的主题匹配不同的处理器
最后,我们在对应的处理器里面实现对主题消息的处理逻辑,比如:定位消息,指令消息等等,比如简单实现对定位数据location主题的消息处理
@slf4j @messageservice(strmessageid = "location", desc = "定位") public class locationmessageservice extends basemessageservice<mqttcommonmessage> { @autowired private rabbitmessagesender messagesender; @override public object process(channelhandlercontext ctx, mqttcommonmessage msg, bytebuf msgbodybuf) throws exception { byte[] msgbytearr = new byte[msgbodybuf.readablebytes()]; msgbodybuf.readbytes(msgbytearr); string data = new string(msgbytearr); msg.putmessagebodyitem("位置", data); return null; } }
后续
目前仅仅是实现mqtt服务端消息接收与消息回复,后续可以根据接入的物联网设备进行对应主题消息的业务处理
到此这篇关于springboot整合netty开发mqtt服务端的文章就介绍到这了,更多相关springboot mqtt服务端内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论