说明:
这是一个使用 java jdk 8 和 spring boot 实现的websocket演示项目。目的是为解决多端消息通讯的问题。
websocket 是一种基于 tcp 的全双工通信协议,核心作用是解决传统 http 协议 “请求 - 响应” 模式的局限性,实现 客户端与服务器之间的实时、双向、低延迟数据传输。
源码地址:https://gitee.com/lqh4188/web-socket
一、功能介绍
功能特性:
- 基于 maven 的 spring boot 项目骨架。
- 纯 websocket 端点 /ws ,支持用户隔离,http:使用ws,https:使用wss。
- 支持分片设置和缓冲区大小设置,解决传输内容限制
- 提供静态测试页面 index.html ,用于连接、发送消息、查看消息。
项目结构:
- pom.xml :spring boot 3.3,依赖 spring-boot-starter-web 和 spring-boot-starter-websocket 。
- src/main/java/com/example/websocket/websocketapplication.java :应用入口。
- src/main/java/com/example/websocket/websocketconfig.java :注册 websocket 处理器,端点为 /ws 。
- src/main/java/com/example/websocket/chatwebsockethandler.java :文本消息处理,广播到所有会话。
- src/main/resources/static/index.html :页面内置 js,连接 ws://{host}/ws ,可发送、显示消息。
关键代码位置
- 启动类: src/main/java/com/example/websocket/websocketapplication.java:1
- websocket 配置: src/main/java/com/example/websocket/websocketconfig.java:1
- 文本消息处理器: src/main/java/com/example/websocket/chatwebsockethandler.java:1
- 静态页面: src/main/resources/static/index.html:1
测试连接
- 打开 http://localhost:8800 ,使用页面上的“连接/发送”测试
- websocket 地址: ws://localhost:8080/ws
二、运行测试
可通过userid来创建独立的联接,进行用户隔离

三、核心代码说明
由于websocket对传输的内容有限制,若内容较大可进行缓冲区大小设置,并对不同文本进行分片处理
chatwebsockethandler.java代码:
package com.example.websocket;
import java.io.bytearrayoutputstream;
import java.net.uri;
import java.nio.bytebuffer;
import java.nio.charset.standardcharsets;
import java.util.map;
import java.util.set;
import java.util.concurrent.concurrenthashmap;
import org.springframework.web.socket.binarymessage;
import org.springframework.web.socket.closestatus;
import org.springframework.web.socket.textmessage;
import org.springframework.web.socket.websocketsession;
import org.springframework.web.socket.handler.abstractwebsockethandler;
import com.fasterxml.jackson.databind.objectmapper;
public class chatwebsockethandler extends abstractwebsockethandler {
private final concurrenthashmap<string, set<websocketsession>> usersessions = new concurrenthashmap<>();
private static final objectmapper mapper = new objectmapper();
private final concurrenthashmap<string, stringbuilder> textfragments = new concurrenthashmap<>();
private final concurrenthashmap<string, bytearrayoutputstream> binaryfragments = new concurrenthashmap<>();
@override
public void afterconnectionestablished(websocketsession session) throws exception {
// 验证用户id的有效性
string uid = resolveuserid(session);
if (uid == null || uid.isempty()) {
session.close(closestatus.bad_data);
return;
}
session.getattributes().put("userid", uid);
//多会话管理
usersessions.computeifabsent(uid, k -> concurrenthashmap.newkeyset()).add(session);
}
@override
protected void handletextmessage(websocketsession session, textmessage message) throws exception {
// 分片处理
string id = session.getid();
if (!message.islast()) {
textfragments.computeifabsent(id, k -> new stringbuilder()).append(message.getpayload());
return;
}
stringbuilder sb = textfragments.remove(id);
string payload = sb != null ? sb.append(message.getpayload()).tostring() : message.getpayload();
routepayload(session, payload);
}
@override
protected void handlebinarymessage(websocketsession session, binarymessage message) throws exception {
//二进制消息处理
string id = session.getid();
bytebuffer buf = message.getpayload();
byte[] chunk = new byte[buf.remaining()];
buf.get(chunk);
bytearrayoutputstream acc = binaryfragments.computeifabsent(id, k -> new bytearrayoutputstream());
acc.write(chunk);
if (message.islast()) {
byte[] all = acc.tobytearray();
binaryfragments.remove(id);
string payload = new string(all, standardcharsets.utf_8);
routepayload(session, payload);
}
}
@override
public boolean supportspartialmessages() {
return true;
}
@override
public void afterconnectionclosed(websocketsession session, closestatus status) throws exception {
// websocket 连接关闭时的清理逻辑
object v = session.getattributes().get("userid");
if (v == null) return;
string uid = string.valueof(v);
set<websocketsession> set = usersessions.get(uid);
if (set != null) {
set.remove(session);
if (set.isempty()) usersessions.remove(uid);
}
}
/** 从 websocket 连接的 url 查询参数中提取用户id */
private string resolveuserid(websocketsession session) {
uri uri = session.geturi();
if (uri == null) return null;
string q = uri.getquery();
if (q == null || q.isempty()) return null;
string[] parts = q.split("&");
for (string p : parts) {
int i = p.indexof('=');
if (i > 0) {
string k = p.substring(0, i);
string val = p.substring(i + 1);
if ("userid".equals(k)) return val;
}
}
return null;
}
private void routepayload(websocketsession session, string payload) throws exception {
object v = session.getattributes().get("userid");
if (v == null) return;
string fromuid = string.valueof(v);
// 解析消息
message message = new message();
message.setfromuserid(fromuid);
try {
// 尝试将payload解析为message对象
message receivedmsg = mapper.readvalue(payload, message.class);
message.settouserid(receivedmsg.gettouserid());
message.setcontent(receivedmsg.getcontent());
message.settype(receivedmsg.gettype());
} catch (exception e) {
// 如果解析失败,将整个payload作为content
message.setcontent(payload);
}
string touid = message.gettouserid();
boolean isp2p = touid != null && !touid.isempty();
set<websocketsession> targets;
if (isp2p) {
targets = usersessions.get(touid);
} else {
targets = usersessions.get(fromuid);
}
// 序列化消息对象
string outstr = mapper.writevalueasstring(message);
textmessage msg = new textmessage(outstr);
if (targets == null || targets.isempty()) {
if (session.isopen()) {
session.sendmessage(msg);
}
return;
}
for (websocketsession s : targets) {
if (s.isopen()) {
s.sendmessage(msg);
}
}
if (isp2p && session.isopen()) {
session.sendmessage(msg);
}
}
}
配置类websocketconfig.java
package com.example.websocket;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.web.socket.websockethandler;
import org.springframework.web.socket.config.annotation.enablewebsocket;
import org.springframework.web.socket.config.annotation.websocketconfigurer;
import org.springframework.web.socket.config.annotation.websockethandlerregistry;
import org.springframework.web.socket.server.standard.servletservercontainerfactorybean;
@configuration
@enablewebsocket
public class websocketconfig implements websocketconfigurer {
@override
public void registerwebsockethandlers(websockethandlerregistry registry) {
registry.addhandler(chathandler(), "/ws").setallowedoriginpatterns("*");
}
@bean
public websockethandler chathandler() {
return new chatwebsockethandler();
}
// 配置 websocket 容器参数(解决消息过大、超时等问题)
@bean
public servletservercontainerfactorybean createwebsocketcontainer() {
servletservercontainerfactorybean container = new servletservercontainerfactorybean();
// 文本消息缓冲区:2mb(解决解码后消息过大的核心配置)
container.setmaxtextmessagebuffersize(2 * 1024 * 1024);
// 二进制消息缓冲区:4mb(按需配置)
container.setmaxbinarymessagebuffersize(4 * 1024 * 1024);
// 会话空闲超时:60秒(无交互则关闭连接)
container.setmaxsessionidletimeout(60_000l);
return container;
}
}
总结
到此这篇关于websocket(java版)服务核心代码的文章就介绍到这了,更多相关java websocket服务内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论