一、websocket通信过程
客户端构建一个websocket实例,并且为它绑定一个需要连接到的服务器地址,当客户端连接服务端的候,会向服务端发送一个http get报文,告诉服务端需要将通信协议切换到websocket,服务端收到http请求后将通信协议切换到websocket,同时发给客户端一个响应报文,返回的状态码为101,表示同意客户端协议转请求,并转换为websocket协议。以上过程都是利用http通信完成的,称之为websocket协议握手(websocket protocol handshake),经过握手之后,客户端和服务端就建立了websocket连接,以后的通信走的都是websocket协议了。
二、服务端实现
1.pom文件添加依赖
<!--websocket--> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-websocket</artifactid> </dependency>
2.启用springboot对websocket的支持
package com.lby.websocket.config; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.web.socket.server.standard.serverendpointexporter; /** * @author liby * @date 2022-04-25 16:18 * @description: * @version: */ @configuration public class websocketconfig { @bean public serverendpointexporter serverendpointexporter() { return new serverendpointexporter(); } }
3.核心配置:websocketserver
因为websocket是类似客户端服务端的形式(采用ws协议),那么这里的websocketserver其实就相当于一个ws协议的controller
@ serverendpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问url地址,客户端可以通过这个url来连接到websocket服务器端
新建一个concurrenthashmap websocketmap 用于接收当前userid的websocket,方便传递之间对userid进行推送消息。
下面是具体业务代码:
package org.example.server; import cn.hutool.core.util.strutil; import cn.hutool.json.jsonobject; import org.apache.logging.log4j.logmanager; import org.apache.logging.log4j.logger; import org.springframework.stereotype.component; import javax.websocket.*; import javax.websocket.server.pathparam; import javax.websocket.server.serverendpoint; import java.io.ioexception; import java.util.concurrent.concurrenthashmap; /* 启动项目后可通过 http://websocket.jsonin.com/ 进行测试 输入网址ws://localhost:8080/websocket/1211,检测是否能进行连接 */ @serverendpoint(value = "/websocket/{userid}") @component public class websocket { private final static logger logger = logmanager.getlogger(websocket.class); /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的 */ private static int onlinecount = 0; /** * concurrent包的线程安全map,用来存放每个客户端对应的mywebsocket对象 */ private static concurrenthashmap<string, websocket> websocketmap = new concurrenthashmap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private session session; private string userid; /** * 连接建立成功调用的方法 */ @onopen public void onopen(session session, @pathparam("userid") string userid) { this.session = session; this.userid = userid; //加入map websocketmap.put(userid, this); addonlinecount(); //在线数加1 logger.info("用户{}连接成功,当前在线人数为{}", userid, getonlinecount()); try { sendmessage(string.valueof(this.session.getquerystring())); } catch (ioexception e) { logger.error("io异常"); } } /** * 连接关闭调用的方法 */ @onclose public void onclose() { //从map中删除 websocketmap.remove(userid); subonlinecount(); //在线数减1 logger.info("用户{}关闭连接!当前在线人数为{}", userid, getonlinecount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @onmessage public void onmessage(string message, session session) { logger.info("来自客户端用户:{} 消息:{}",userid, message); //群发消息 for (string item : websocketmap.keyset()) { try { websocketmap.get(item).sendmessage(message); } catch (ioexception e) { e.printstacktrace(); } } } /** * 发生错误时调用 * */ @onerror public void onerror(session session, throwable error) { logger.error("用户错误:" + this.userid + ",原因:" + error.getmessage()); error.printstacktrace(); } /** * 向客户端发送消息 */ public void sendmessage(string message) throws ioexception { this.session.getbasicremote().sendtext(message); //this.session.getasyncremote().sendtext(message); } /** * 通过userid向客户端发送消息 */ public void sendmessagebyuserid(string userid, string message) throws ioexception { logger.info("服务端发送消息到{},消息:{}",userid,message); if(strutil.isnotblank(userid)&&websocketmap.containskey(userid)){ websocketmap.get(userid).sendmessage("hello"); }else{ logger.error("用户{}不在线",userid); } } /** * 群发自定义消息 */ public void sendinfo(string message) throws ioexception { for (string item : websocketmap.keyset()) { try { websocketmap.get(item).sendmessage(message); } catch (ioexception e) { e.printstacktrace(); } } } public static synchronized int getonlinecount() { return onlinecount; } public static synchronized void addonlinecount() { websocket.onlinecount++; } public static synchronized void subonlinecount() { websocket.onlinecount--; } }
三、客户端实现
1.pom文件添加依赖
<dependency> <groupid>org.java-websocket</groupid> <artifactid>java-websocket</artifactid> <version>1.5.3</version> </dependency>
2.客户端实现代码
package org.example; import lombok.sneakythrows; import lombok.extern.slf4j.slf4j; import org.java_websocket.client.websocketclient; import org.java_websocket.handshake.serverhandshake; import java.net.uri; import java.util.concurrent.executors; import java.util.concurrent.scheduledexecutorservice; import java.util.concurrent.timeunit; /** * @author kele * @date 2024/2/19 **/ @slf4j public class mywebsocketclient extends websocketclient { public mywebsocketclient(uri serveruri) { super(serveruri); } @sneakythrows @override public void onopen(serverhandshake data) { try { log.info("websocket连接已打开。"); }catch (exception e){ log.error("onopen error :{}",e.getmessage()); } } @sneakythrows @override public void onmessage(string message) { try { if (message != null && !message.isempty()) { log.info("收到消息: {}",message); } }catch (exception e){ log.error("onmessage error : {}",message); } } @override public void onclose(int code, string reason, boolean remote) { log.info("websocket连接已关闭。"); } @override public void onerror(exception ex) { log.info("websocket连接发生错误:{}", ex.getmessage()); } /** * 连接定时检查 */ public void startreconnecttask(long delay, timeunit unit) { system.out.println("websocket 心跳检查"); log.info("websocket 心跳检查"); // 以下为定时器,建议使用自定义线程池,或交给框架处理(spring) scheduledexecutorservice executorservice = executors.newsinglethreadscheduledexecutor(); executorservice.schedulewithfixeddelay(() -> { // 检查逻辑:判断当前连接是否连通 if (!this.isopen()) { system.out.println("websocket 开始重连......"); log.info("websocket 开始重连......"); // 重置连接 this.reconnect(); // 以下为错误示范 //this.close(); //this.connect(); } }, 0, delay, unit); } }
3.开启客户端连接服务,并开启定时检查websocket连接
package org.example; import lombok.extern.slf4j.slf4j; import org.springframework.stereotype.component; import javax.annotation.postconstruct; import java.net.uri; import java.util.concurrent.timeunit; @slf4j @component public class init implements runnable { public static mywebsocketclient mywebsocketclient; @postconstruct public void run () { try { //启动连接 log.info("连接websocket服务端"); log.info("项目启动"); // 服务地址 uri uri = new uri("ws://127.0.0.1:8077/websocket/123"); log.info("服务地址 -{}", uri); // 创建客户端 mywebsocketclient = new mywebsocketclient(uri); // 建立连接 mywebsocketclient.connect(); // 开启 定时检查 mywebsocketclient.startreconnecttask(5, timeunit.seconds); }catch (exception e){ e.printstacktrace(); } } }
四、可能遇到的问题及解决方案
1.websocket客户端重连机制
注意引入java-websocket的版本要高于1.5.0才有reconnect()方法
2.在使用websocket连接传输base64编码时出现错误
原因:websocket中maxmessagesize默认是8k,接收的图片大于8k导致接收失败,解决方法:适当加大maxmessagesize。
参考资料:
@onmessage(maxmessagesize = 10240000) public void onmessage(byte[] message) throws ioexception { log.info("收到图片了"); string str = base64.encodebase64string(message); // 图片base64 log.info(str.substring(0, 100)); }
到此这篇关于springboot实现websocket服务端及客户端的文章就介绍到这了,更多相关springboot websocket服务端及客户端内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论