最近想学学websocket做一个实时通讯的练手项目
主要用到的技术栈是websocket netty vue pinia mysql springboot,实现一个持久化数据,单一群聊,支持多用户的聊天界面
下面是实现的过程
后端
springboot启动的时候会占用一个端口,而netty也会占用一个端口,这两个端口不能重复,并且因为netty启动后会阻塞当前线程,因此需要另开一个线程防止阻塞住springboot
1. 编写netty服务器
个人认为,netty最关键的就是channel,可以代表一个客户端
我在这使用的是@postconstruct注解,在bean初始化后调用里面的方法,新开一个线程运行netty,因为希望netty受spring管理,所以加上了spring的注解,也可以直接在启动类里注入netty然后手动启动
@service public class nettyservice { private eventloopgroup bossgroup = new nioeventloopgroup(1); private eventloopgroup workgroup = new nioeventloopgroup(); @autowired private websockethandler websockethandler; @autowired private heartbeathandler heartbeathandler; @postconstruct public void initnetty() throws baseexception { new thread(()->{ try { start(); } catch (exception e) { throw new runtimeexception(e); } }).start(); } @predestroy public void destroy() throws baseexception { bossgroup.shutdowngracefully(); workgroup.shutdowngracefully(); } @async public void start() throws baseexception { try { channelfuture channelfuture = new serverbootstrap() .group(bossgroup, workgroup) .channel(nioserversocketchannel.class) .handler(new logginghandler(loglevel.debug)) .childhandler(new channelinitializer<niosocketchannel>() { @override protected void initchannel(niosocketchannel niosocketchannel) throws exception { niosocketchannel.pipeline() // http解码编码器 .addlast(new httpservercodec()) // 处理完整的 http 消息 .addlast(new httpobjectaggregator(64 * 1024)) // 心跳检测时长 .addlast(new idlestatehandler(300, 0, 0, timeunit.seconds)) // 心跳检测处理器 .addlast(heartbeathandler) // 支持ws协议(自定义) .addlast(new websocketserverprotocolhandler("/ws",null,true,64*1024,true,true,10000)) // ws请求处理器(自定义) .addlast(websockethandler) ; } }).bind(8081).sync(); system.out.println("netty启动成功"); channelfuture future = channelfuture.channel().closefuture().sync(); } catch (interruptedexception e){ throw new interruptedexception (); } finally { //优雅关闭 bossgroup.shutdowngracefully(); workgroup.shutdowngracefully(); } } }
服务器类只是指明一些基本信息,包含处理器类,支持的协议等等,具体的处理逻辑需要再自定义类来实现
2. 心跳检测处理器
心跳检测是指 服务器无法主动确定客户端的状态(用户可能关闭了网页,但是服务端没办法知道),为了确定客户端是否在线,需要客户端定时发送一条消息,消息内容不重要,重要的是发送消息代表该客户端仍然在线,当客户端长时间没有发送数据时,代表客户端已经下线
package org.example.payroll_management.websocket.netty.handler; @component @channelhandler.sharable public class heartbeathandler extends channelduplexhandler { @autowired private channelcontext channelcontext; private static final logger logger = loggerfactory.getlogger(heartbeathandler.class); @override public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception { if (evt instanceof idlestateevent){ // 心跳检测超时 idlestateevent e = (idlestateevent) evt; logger.info("心跳检测超时"); if (e.state() == idlestate.reader_idle){ attribute<integer> attr = ctx.channel().attr(attributekey.valueof(ctx.channel().id().tostring())); integer userid = attr.get(); // 读超时,当前已经下线,主动断开连接 channelcontext.removechannel(userid); ctx.close(); } else if (e.state() == idlestate.writer_idle){ ctx.writeandflush("心跳检测"); } } super.usereventtriggered(ctx, evt); } }
3. websocket处理器
当客户端发送消息,消息的内容会发送当websocket处理器中,可以对对应的方法进行处理,我这里偷懒了,就做了一个群组,全部用户只能在同一群中聊天,不过创建多个群组,或单对单聊天也不复杂,只需要将群组的id进行保存就可以
这里就产生第一个问题了,就是springmvc的拦截器不会拦截其他端口的请求,解决方法是将token放置到请求参数中,在usereventtriggered方法中重新进行一次token检验
第二个问题,我是在拦截器中通过threadlocal保存用户id,不走拦截器在其他地方拿不到用户id,解决方法是,在usereventtriggered方法中重新保存,或者channel中可以保存附件(自身携带的数据),直接将id保存到附件中
第三个问题,消息的持久化,当用户重新打开界面时,肯定希望消息仍然存在,鉴于websocket的实时性,数据持久化肯定不能在同一个线程中完成,我在这使用blockingqueue+线程池完成对消息的异步保存,或者也可以用mq实现
不过用的executors.newsinglethreadexecutor();可能会产生oom的问题,后面可以自定义一个线程池,当任务满了之后,指定拒绝策略为抛出异常,再通过全局异常捕捉拿到对应的数据保存到数据库中,不过俺这种小项目应该不会产生这种问题
第四个问题,消息内容,这个需要前后端统一一下,确定一下传输格式就ok了,然后从json中取出数据处理
最后就是在线用户统计,这个没什么好说的,里面有对应的方法,当退出时,直接把channel踢出去就可以了
package org.example.payroll_management.websocket.netty.handler; @component @channelhandler.sharable public class websockethandler extends simplechannelinboundhandler<textwebsocketframe> { @autowired private channelcontext channelcontext; @autowired private messagemapper messagemapper; @autowired private userservice userservice; private static final logger logger = loggerfactory.getlogger(websockethandler.class); private static final blockingqueue<websocketmessagedto> blockingqueue = new arrayblockingqueue(1024 * 1024); private static final executorservice executor_service = executors.newsinglethreadexecutor(); // 提交线程 @postconstruct private void init(){ executor_service.submit(new messagehandler()); } private class messagehandler implements runnable{ // 异步保存 @override public void run() { while(true){ websocketmessagedto message = null; try { message = blockingqueue.take(); logger.info("消息持久化"); } catch (interruptedexception e) { throw new runtimeexception(e); } integer success = messagemapper.savemessage(message); if (success < 1){ try { throw new baseexception("保存信息失败"); } catch (baseexception e) { throw new runtimeexception(e); } } } } } // 当读事件发生时(有客户端发送消息) @override protected void channelread0(channelhandlercontext channelhandlercontext, textwebsocketframe textwebsocketframe) throws exception { channel channel = channelhandlercontext.channel(); // 收到的消息 string text = textwebsocketframe.text(); attribute<integer> attr = channelhandlercontext.channel().attr(attributekey.valueof(channelhandlercontext.channel().id().tostring())); integer userid = attr.get(); logger.info("接收到用户id为 {} 的消息: {}",userid,text); // todo 将text转成json,提取里面的数据 websocketmessagedto websocketmessage = jsonutil.tobean(text, websocketmessagedto.class); if (websocketmessage.gettype().equals("心跳检测")){ logger.info("{}发送心跳检测",userid); } else if (websocketmessage.gettype().equals("群发")){ channelgroup channelgroup = channelcontext.getchannelgroup(null); websocketmessagedto messagedto = jsonutil.tobean(text, websocketmessagedto.class); websocketmessagedto websocketmessagedto = new websocketmessagedto(); websocketmessagedto.settype("群发"); websocketmessagedto.settext(messagedto.gettext()); websocketmessagedto.setreceiver("all"); websocketmessagedto.setsender(string.valueof(userid)); websocketmessagedto.setsenddate(timeutil.timeformat("yyyy-mm-dd")); blockingqueue.add(websocketmessagedto); channelgroup.writeandflush(new textwebsocketframe(jsonutil.tojsonprettystr(websocketmessagedto))); } else{ channel.writeandflush("请发送正确的格式"); } } // 建立连接后触发(有客户端建立连接请求) @override public void channelactive(channelhandlercontext ctx) throws exception { logger.info("建立连接"); super.channelactive(ctx); } // 连接断开后触发(有客户端关闭连接请求) @override public void channelinactive(channelhandlercontext ctx) throws exception { attribute<integer> attr = ctx.channel().attr(attributekey.valueof(ctx.channel().id().tostring())); integer userid = attr.get(); logger.info("用户id:{} 断开连接",userid); channelgroup channelgroup = channelcontext.getchannelgroup(null); channelgroup.remove(ctx.channel()); channelcontext.removechannel(userid); websocketmessagedto websocketmessagedto = new websocketmessagedto(); websocketmessagedto.settype("用户变更"); list<onlineuservo> onlineuser = userservice.getonlineuser(); websocketmessagedto.settext(jsonutil.tojsonstr(onlineuser)); websocketmessagedto.setreceiver("all"); websocketmessagedto.setsender("0"); websocketmessagedto.setsenddate(timeutil.timeformat("yyyy-mm-dd")); channelgroup.writeandflush(new textwebsocketframe(jsonutil.tojsonstr(websocketmessagedto))); super.channelinactive(ctx); } // 建立连接后触发(客户端完成连接) @override public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception { if (evt instanceof websocketserverprotocolhandler.handshakecomplete){ websocketserverprotocolhandler.handshakecomplete handshakecomplete = (websocketserverprotocolhandler.handshakecomplete) evt; string uri = handshakecomplete.requesturi(); logger.info("uri: {}",uri); string token = gettoken(uri); if (token == null){ logger.warn("token校验失败"); ctx.close(); throw new baseexception("token校验失败"); } logger.info("token: {}",token); integer userid = null; try{ claims claims = jwtutil.extractclaims(token); userid = integer.valueof((string) claims.get("userid")); }catch (exception e){ logger.warn("token校验失败"); ctx.close(); throw new baseexception("token校验失败"); } // 向channel中的附件中添加用户id channelcontext.addcontext(userid,ctx.channel()); channelcontext.setchannel(userid,ctx.channel()); channelcontext.setchannelgroup(null,ctx.channel()); channelgroup channelgroup = channelcontext.getchannelgroup(null); websocketmessagedto websocketmessagedto = new websocketmessagedto(); websocketmessagedto.settype("用户变更"); list<onlineuservo> onlineuser = userservice.getonlineuser(); websocketmessagedto.settext(jsonutil.tojsonstr(onlineuser)); websocketmessagedto.setreceiver("all"); websocketmessagedto.setsender("0"); websocketmessagedto.setsenddate(timeutil.timeformat("yyyy-mm-dd")); channelgroup.writeandflush(new textwebsocketframe(jsonutil.tojsonstr(websocketmessagedto))); } super.usereventtriggered(ctx, evt); } private string gettoken(string uri){ if (uri.isempty()){ return null; } if(!uri.contains("token")){ return null; } string[] split = uri.split("\\?"); if (split.length!=2){ return null; } string[] split1 = split[1].split("="); if (split1.length!=2){ return null; } return split1[1]; } }
4. 工具类
主要用来保存用户信息的
不要问我为什么又有static又有普通方法,问就是懒得改,这里我直接保存的同一个群组,如果需要多群组的话,就需要建立sql数据了
package org.example.payroll_management.websocket; @component public class channelcontext { private static final map<integer, channel> user_channel_map = new concurrenthashmap<>(); private static final map<integer, channelgroup> user_channelgroup_map = new concurrenthashmap<>(); private static final integer group_id = 10086; private static final logger logger = loggerfactory.getlogger(channelcontext.class); public void addcontext(integer userid,channel channel){ string channelid = channel.id().tostring(); attributekey attributekey = null; if (attributekey.exists(channelid)){ attributekey = attributekey.valueof(channelid); } else{ attributekey = attributekey.newinstance(channelid); } channel.attr(attributekey).set(userid); } public static list<integer> getalluserid(){ return new arraylist<>(user_channel_map.keyset()); } public static void setchannel(integer userid,channel channel){ user_channel_map.put(userid,channel); } public static channel getchannel(integer userid){ return user_channel_map.get(userid); } public static void removechannel(integer userid){ user_channel_map.remove(userid); } public static void setchannelgroup(integer groupid,channel channel){ if(groupid == null){ groupid = group_id; } channelgroup channelgroup = user_channelgroup_map.get(groupid); if (channelgroup == null){ channelgroup =new defaultchannelgroup(globaleventexecutor.instance); user_channelgroup_map.put(group_id, channelgroup); } if (channel == null){ return ; } channelgroup.add(channel); logger.info("向group中添加channel,channelgroup已有channel数量:{}",channelgroup.size()); } public static channelgroup getchannelgroup(integer groupid){ if (groupid == null){ groupid = group_id; } return user_channelgroup_map.get(groupid); } public static void removechannelgroup(integer groupid){ if (groupid == null){ groupid = group_id; } user_channelgroup_map.remove(groupid); } }
写到这里,netty服务就搭建完成了,后面就可以等着前端的请求建立了
前端
前端我使用的vue,因为我希望当用户登录后自动建立ws连接,所以我在登录成功后添加上了ws建立请求,然后我发现,如果用户关闭网页后重新打开,因为跳过了登录界面,ws请求不会自动建立,所以需要一套全局的ws请求
不过我前端不是很好(其实后端也一般),所以很多地方肯定有更优的写法
1. pinia
使用pinia保存ws请求,方便在其他组件中调用
定义websocket实例(ws)和一个请求建立判断(wsconnect)
后面就可以通过ws接收服务的消息
import { definestore } from 'pinia' export const usewebsocketstore = definestore('websocket', { state() { return { ws: null, wsconnect: false, } }, actions: { wsinit() { if (this.ws === null) { const token = localstorage.getitem("token") if (token === null) return; this.ws = new websocket(`ws://localhost:8081/ws?token=${token}`) this.ws.onopen = () => { this.wsconnect = true; console.log("ws协议建立成功") // 发送心跳 const intervalid = setinterval(() => { if (!this.wsconnect) { clearinterval(intervalid) } const websocketmessagedto = { type: "心跳检测" } this.sendmessage(json.stringify(websocketmessagedto)); }, 1000 * 3 * 60); } this.ws.onclose = () => { this.ws = null; this.wsconnect = false; } } }, sendmessage(message) { if (message == null || message == '') { return; } if (!this.wsconnect) { console.log("ws协议没有建立") this.wsinit(); } this.ws.send(message); }, wsclose() { if (this.wsconnect) { this.ws.close(); this.wsconnect = false; } } } })
然后再app.vue中循环建立连接(建立请求重试)
const wsconnect = function () { const token = localstorage.getitem("token") if (token === null) { return; } try { if (!websocket.wsconnect) { console.log("尝试建立ws请求") websocket.wsinit(); } else { return; } } catch { wsconnect(); } }
2. 聊天组件
界面相信大伙都会画,主要说一下我遇到的问题
第一个 上拉刷新,也就是加载历史记录的功能,我用的element-plus ui,也不知道是不是我的问题,ui里面的无限滚动不是重复发送请求就是无限发送请求,而且好像没有上拉加载的功能。于是我用了intersectionobserver来解决,在页面底部加上一个div,当观察到这个div时,触发请求
第二个 滚动条到达顶部时,请求数据并放置数据,滚动条会自动滚动到顶部,并且由于观察的元素始终在顶端导致无限请求,这个其实也不是什么大问题,因为聊天的消息是有限的,没有数据之后我设置了停止观察,主要是用户体验不是很好。这是我是添加了display: flex; flex-direction: column-reverse;解决这个问题的(flex很神奇吧)。大致原理好像是垂直翻转了(例如上面我将观察元素放到div第一个子元素位置,添加flex后观察元素会到最后一个子元素位置上),也就是说当滚动条在最底部时,添加数据后,滚动条会自动滚动到最底部,不过这样体验感非常的不错
不要问我为什么数据要加 || 问就是数据懒得统一了
<style lang="scss" scoped> .chatbox { border-radius: 20px; box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; width: 1200px; height: 600px; background-color: white; display: flex; .chat { width: 1000px; height: inherit; .chatbackground { height: 500px; overflow: auto; display: flex; flex-direction: column-reverse; .loading { text-align: center; font-size: 12px; margin-top: 20px; color: gray; } .chatitem { width: 100%; padding-bottom: 20px; .avatar { margin-left: 20px; display: flex; align-items: center; .username { margin-left: 10px; color: rgb(153, 153, 153); font-size: 13px; } } .chatitemmessage { margin-left: 60px; padding: 10px; font-size: 14px; width: 200px; word-break: break-all; max-width: 400px; line-height: 25px; width: fit-content; border-radius: 10px; height: auto; /* background-color: skyblue; */ box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; } .senddate { font-size: 12px; margin-top: 10px; margin-left: 60px; color: rgb(187, 187, 187); } } } .chatbottom { height: 100px; background-color: #f3f3f3; border-radius: 20px; display: flex; box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; .messageinput { border-radius: 20px; width: 400px; height: 40px; } } } .userlist { width: 200px; height: inherit; border-radius: 20px; box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; .user { width: inherit; height: 50px; line-height: 50px; text-indent: 2em; border-radius: 20px; transition: all 0.5s ease; } } } .user:hover { box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; transform: translatex(-5px) translatey(-5px); } </style> <template> {{hasmessage}} <div class="chatbox"> <div class="chat"> <div class="chatbackground" ref="chatbackgroundref"> <div class="chatitem" v-for="i in messagelist"> <div class="avatar"> <el-avatar :size="40" :src="imageurl" /> <div class="username">{{i.username || i.userid}}</div> </div> <div class="chatitemmessage"> {{i.text || i.content}} </div> <div class="senddate"> {{i.date || i.senddate}} </div> </div> <div class="loading" ref="loading"> 显示更多内容 </div> </div> <div class="chatbottom"> <el-input class="messageinput" v-model="message" placeholder="消息内容"></el-input> <el-button @click="sendmessage">发送消息</el-button> </div> </div> <!-- 做成无限滚动 --> <div class="userlist"> <div v-for="user in userlist"> <div class="user"> {{user.username}} </div> </div> </div> </div> </template> <script setup> import { ref, onmounted, nexttick } from 'vue' import request from '@/utils/request.js' import { usewebsocketstore } from '@/stores/usewebsocketstore' import imageurl from '@/assets/默认头像.jpg' const websocketstore = usewebsocketstore(); const chatbackgroundref = ref(null) const userlist = ref([]) const message = ref('') const messagelist = ref([ ]) const loading = ref(null) const page = ref(1); const size = 10; const hasmessage = ref(true); const observer = new intersectionobserver((entries, observer) => { entries.foreach(async entry => { if (entry.isintersecting) { observer.unobserve(entry.target) await pagequerymessage(); } }) }) onmounted(() => { observer.observe(loading.value) getonlineuserlist(); if (!websocketstore.wsconnect) { websocketstore.wsinit(); } const ws = websocketstore.ws; ws.onmessage = async (e) => { // console.log(e); const websocketmessage = json.parse(e.data); const messageobj = { username: websocketmessage.sender, text: websocketmessage.text, date: websocketmessage.senddate, type: websocketmessage.type } console.log("###") // console.log(json.parse(messageobj.text)) if (messageobj.type === "群发") { messagelist.value.unshift(messageobj) } else if (messageobj.type === "用户变更") { userlist.value = json.parse(messageobj.text) } await nexttick(); // 当发送新消息时,自动滚动到页面最底部,可以替换成消息提示的样式 // chatbackgroundref.value.scrolltop = chatbackgroundref.value.scrollheight; console.log(websocketmessage) } }) const pagequerymessage = function () { request({ url: '/api/message/pagequerymessage', method: 'post', data: { page: page.value, size: size } }).then((res) => { console.log(res) if (res.data.data.length === 0) { hasmessage.value = false; } else { observer.observe(loading.value) page.value = page.value + 1; messagelist.value.push(...res.data.data) } }) } function getonlineuserlist() { request({ url: '/api/user/getonlineuser', method: 'get' }).then((res) => { console.log(res) userlist.value = res.data.data; }) } const sendmessage = function () { if (!websocketstore.wsconnect) { websocketstore.wsinit(); } const websocketmessagedto = { type: "群发", text: message.value } websocketstore.sendmessage(json.stringify(websocketmessagedto)); } </script>
这样就实现了一个简易的聊天数据持久化,支持在线聊天的界面,总的来说websocket用起来还是十分方便的
到此这篇关于springboot+netty+vue+websocket实现在线聊天的文章就介绍到这了,更多相关springboot netty vue websocket在线聊天内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论