实现功能
- 1. 给某个分组推送消息
- 2. 给所有分组推送消息
- 3. 给所有用户推送消息
- 4. 给某一个用户单独推送消息(在分组中)
- 5. 给某一个用户推送消息(不在在分组中)
- 6. 用户可能存在多个分组
- 7. 用户多设备登录
- 8. 监控连接心跳(后端实现不需要前端实现配合)
- 9. 失败超过5次关闭连接
使用公平锁支持并发。
添加依赖
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-websocket -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-websocket</artifactid>
</dependency>websocket配置类
/**
* @description websocket配置类
* @author wangkun
* @date 2025/8/10 19:02
* @version
*/
@configuration
@enablewebsocket
public class websocketconfig implements websocketconfigurer {
/**
* @description websocket拦截器配置端点
* @param registry websocket处理程序注册表,用于注册websocket处理器和拦截器
* @throws 可能抛出websocket相关的异常
* @return void 无返回值
* @date 2025-08-10 19:14:42
* @author wangkun
**/
@override
public void registerwebsockethandlers(websockethandlerregistry registry) {
// 注册websocket处理器,并设置url路径为"/ws/{id}"
registry.addhandler(websockethandler(), "/ws/{id}")
// 允许所有来源的跨域请求
.setallowedorigins("*");
}
/**
* @description websocket核心处理器
* @throws 可能抛出bean创建相关的异常
* @return jwebsocketmanager 返回websocket管理器的实例
* @date 2025-08-10 19:13:48
* @author wangkun
**/
@bean
public websocketmanager websocketmanager() {
return new websocketmanager();
}
/**
* @description websocket拦截处理
* @throws 可能抛出bean创建相关的异常
* @return jwebsockethandler 返回websocket处理器的实例
* @date 2025-08-10 19:15:20
* @author wangkun
**/
@bean // 将此方法返回的对象注册为spring容器中的bean
public websockethandler websockethandler() {
// 创建websocket处理器实例,并注入websocket管理器
return new websockethandler(websocketmanager());
}
}websocket拦截器
/**
* @description websocket拦截器
* @author wangkun
* @date 2025/8/10 19:14
* @version
*/
@slf4j
public class websockethandler extends textwebsockethandler {
private final websocketmanager manager; // websocket管理器,用于管理websocket会话
/**
* @description 构造器注入核心
* @param manager websocket管理器实例
* @throws
* @return
* @date 2025-08-21 09:27:35
* @author wangkun
**/
public websockethandler(websocketmanager manager) {
this.manager = manager;
}
/**
* @param session websocket会话对象
* @description 初始化会话
* @throws 当会话建立过程中出现异常时抛出
* @return void
* @date 2025-08-10 19:17:57
* @author wangkun
**/
@override
public void afterconnectionestablished(websocketsession session) {
manager.addsession(extractid(session), session);
}
/**
* @param session websocket会话对象
* @param message 接收到的文本消息
* @description pong消息处理
* @throws 当消息处理过程中出现异常时抛出
* @return void
* @date 2025-08-10 19:18:01
* @author wangkun
**/
@override
protected void handletextmessage(websocketsession session, textmessage message) {
string payload = message.getpayload();
log.info("收到来自[{}]的pong消息: {}", extractid(session), payload);
if ("pong".equals(payload)) {
manager.handlepongmessage(session);
}
}
/**
* @param session websocket会话对象
* @param message 接收到的pong消息
* @description 消息处理
* @throws 当消息处理过程中出现异常时抛出
* @return void
* @date 2025-08-10 19:18:08
* @author wangkun
**/
@override
public void handlepongmessage(websocketsession session, pongmessage message) {
manager.handlepongmessage(session);
}
/**
* @param session websocket会话对象
* @param status 会话关闭状态
* @description 会话关闭
* @throws 当会话关闭过程中出现异常时抛出
* @return void
* @date 2025-08-10 19:18:13
* @author wangkun
**/
@override
public void afterconnectionclosed(websocketsession session, closestatus status) {
manager.removetosession(session);
}
/**
* @param session websocket会话对象
* @description 拦截请求参数处理
* @throws 当uri解析过程中出现异常时抛出
* @return java.lang.string 返回从uri中提取的id
* @date 2025-08-10 19:18:22
* @author wangkun
**/
private string extractid(websocketsession session) {
// 从uri路径提取id:/ws/{id}
string path = session.geturi().getpath();
return path.substring(path.lastindexof('/') + 1);
}
}websocket处理器
/**
* @description websocket处理器
* @author wangkun
* @date 2025/8/10 19:20
* @version
*/
@slf4j
public class websocketmanager {
// id -> 设备会话集合 (支持多设备)
private final concurrentmap<string, set<websocketsession>> sessions = new concurrenthashmap<>();
// 会话 -> id
private final concurrentmap<websocketsession, string> session_map = new concurrenthashmap<>();
// 分组id -> id集合
private final concurrentmap<string, set<string>> groups = new concurrenthashmap<>();
// 会话失败计数器 (session -> 失败次数)
private final concurrentmap<websocketsession, integer> failure_counts = new concurrenthashmap<>();
// 心跳状态监控 (session -> 最后活跃时间)
private final concurrentmap<websocketsession, long> last_active_times = new concurrenthashmap<>();
// 公平锁,确保所有操作的原子性
private final reentrantlock main_lock = new reentrantlock(true);
// 心跳状态监控
private final concurrentmap<websocketsession, long> lastpongtimes = new concurrenthashmap<>();
private final concurrentmap<websocketsession, scheduledfuture<?>> pingtasks = new concurrenthashmap<>();
// 心跳间隔25秒
private static final long heartbeat_interval = 25000;
// 心跳超时40秒
private static final long heartbeat_timeout = 40000;
// 心跳ping nio包装
private static final bytebuffer ping_payload = bytebuffer.wrap(new byte[]{0x1});
// 线程池
private final scheduledexecutorservice scheduler = executors.newscheduledthreadpool(4);
public websocketmanager() {
// 心跳检测任务
scheduler.scheduleatfixedrate(this::checkheartbeats, 10, 10, timeunit.seconds);
// 会话清理任务
scheduler.scheduleatfixedrate(this::cleanexpiredsessions, 5, 5, timeunit.minutes);
}
/**
* @param id
* @param session
* @description 建立连接添加到session
* @throws
* @return void
* @date 2025-08-10 19:37:14
* @author wangkun
**/
public void addsession(string id, websocketsession session) {
main_lock.lock();
try {
if (!session_map.containskey(session)) {
// 会话管理
sessions.computeifabsent(id, k -> concurrenthashmap.newkeyset()).add(session);
session_map.put(session, id);
// 初始化心跳状态
lastpongtimes.put(session, system.currenttimemillis());
startpingtask(session);
log.info("[连接建立] {}: 会话:{}", id, session.getid());
}
} finally {
main_lock.unlock();
}
}
/**
* @param session
* @description 移除session中的
* @throws
* @return void
* @date 2025-08-10 19:40:11
* @author wangkun
**/
public void removetosession(websocketsession session) {
main_lock.lock();
try {
string id = session_map.get(session);
if (id != null) {
// 清理会话
set<websocketsession> sessions = sessions.get(id);
if (sessions != null) {
sessions.remove(session);
if (sessions.isempty()) {
sessions.remove(id);
}
}
// 清理心跳任务
stoppingtask(session);
lastpongtimes.remove(session);
session_map.remove(session);
log.info("[连接关闭] {}: 会话{}: ", id, session.getid());
}
} finally {
main_lock.unlock();
}
}
/**
* @param session
* @description 开始心跳ping
* @throws
* @return void
* @date 2025-08-10 19:41:06
* @author wangkun
**/
private void startpingtask(websocketsession session) {
scheduledfuture<?> task = scheduler.scheduleatfixedrate(() -> {
main_lock.lock();
try {
if (session.isopen()) {
try {
// 标准websocket ping发送
if (session instanceof standardwebsocketsession) {
((standardwebsocketsession) session).getnativesession()
.getasyncremote().sendping(ping_payload);
} else {
session.sendmessage(new pingmessage(ping_payload));
}
log.info("[ping发送] 会话{}: ", session.getid());
} catch (exception e) {
log.info("[ping失败] 会话{}: 错误{}: {}", session.getid(), e.getmessage(), session.geturi());
closesession(session);
}
}
} finally {
main_lock.unlock();
}
}, heartbeat_interval, heartbeat_interval, timeunit.milliseconds);
pingtasks.put(session, task);
}
/**
* @param session
* @description 心跳停止
* @throws
* @return void
* @date 2025-08-10 19:41:55
* @author wangkun
**/
private void stoppingtask(websocketsession session) {
scheduledfuture<?> task = pingtasks.remove(session);
if (task != null) {
task.cancel(true);
}
}
/**
* @param session
* @description 心跳pong接收
* @throws
* @return void
* @date 2025-08-10 19:42:20
* @author wangkun
**/
public void handlepongmessage(websocketsession session) {
main_lock.lock();
try {
lastpongtimes.put(session, system.currenttimemillis());
log.info("[pong接收] 会话{}: {}", session.getid(), session.geturi());
} finally {
main_lock.unlock();
}
}
/**
* @param id
* @param groupid
* @description 添加至分组
* @throws
* @return void
* @date 2025-08-10 19:43:20
* @author wangkun
**/
public boolean addtogroup(string id, string groupid) {
main_lock.lock();
if (stringutils.isblank(id) && stringutils.isempty(id)) {
log.info("加入分组失败,id 为空");
return false;
}
if (stringutils.isblank(groupid) && stringutils.isempty(groupid)) {
log.info("加入分组失败,groupid 为空");
return false;
}
try {
boolean flag = groups.computeifabsent(groupid, k -> concurrenthashmap.newkeyset()).add(id);
log.info("[加入分组] {}: → 分组{}: (当前组内: {})", id, groupid, groups.get(groupid).size());
return flag;
} finally {
main_lock.unlock();
}
}
/**
* @param id
* @param groupid
* @description 从分组移除
* @throws
* @return void
* @date 2025-08-10 19:44:10
* @author wangkun
**/
public boolean removetogroup(string id, string groupid) {
main_lock.lock();
try {
if (stringutils.isblank(id) && stringutils.isempty(id)) {
log.info("移除分组失败,id 为空");
return false;
}
if (stringutils.isblank(groupid) && stringutils.isempty(groupid)) {
log.info("移除分组失败,groupid 为空");
return false;
}
set<string> groups = groups.get(groupid);
if (groups != null) {
groups.remove(id);
if (groups.isempty()) {
groups.remove(groupid);
}
log.info("[离开分组] {}: ← 分组{}: ", id, groupid);
return true;
}
return false;
} finally {
main_lock.unlock();
}
}
/**
* @param id
* @param message
* @description 消息发送,多设备
* @throws
* @return boolean
* @date 2025-08-10 19:44:48
* @author wangkun
**/
public boolean sendmessages(string id, string message) {
main_lock.lock();
try {
if (stringutils.isblank(id) && stringutils.isempty(id)) {
log.info("消息发送失败,id 为空");
return false;
}
set<websocketsession> sessions = sessions.get(id);
if (sessions == null || sessions.isempty()) {
log.info("[单发消息失败] {}: 无活跃会话", id);
return false;
}
// 创建副本防止并发修改
list<websocketsession> sessionscopy = new arraylist<>(sessions);
boolean allsuccess = true;
for (websocketsession session : sessionscopy) {
if (validatesession(session)) {
allsuccess = false;
continue;
}
allsuccess &= sendmessage(session, message);
}
log.info("[单发消息完成] {}: 设备数{}:成功{} ", id, sessionscopy.size(), allsuccess ? "全部" : "部分");
return allsuccess;
} finally {
main_lock.unlock();
}
}
/**
* @param groupid
* @param message
* @description 按组发送
* @throws
* @return boolean
* @date 2025-08-10 19:46:15
* @author wangkun
**/
public boolean sendgroupmessages(string groupid, string message) {
main_lock.lock();
try {
if (stringutils.isblank(groupid)&& stringutils.isempty(message)) {
log.info("[按组发失败] groudid为空");
return false;
}
set<string> ids = groups.get(groupid);
if (ids == null || ids.isempty()) {
log.info("[按组发失败] 分组{}: 无成员", groupid);
return false;
}
boolean allsuccess = true;
// 创建副本防止并发修改
for (string id : new arraylist<>(ids)) {
allsuccess &= sendmessages(id, message);
}
log.info("[组发完成] 分组{}: 成员数{}: 成功{}", groupid, ids.size(), allsuccess ? "全部" : "部分");
return allsuccess;
} finally {
main_lock.unlock();
}
}
/**
* @param message
* @description 全局广播无论在不在分组
* @throws
* @return boolean
* @date 2025-08-10 19:47:33
* @author wangkun
**/
public boolean sendglobalmessages(string message) {
main_lock.lock();
try {
if (sessions.isempty()) {
log.info("[广播失败] 无活跃");
return false;
}
boolean allsuccess = true;
int totalsessions = 0;
// 创建副本防止并发修改
set<string> ids = new hashset<>(sessions.keyset());
for (string id : ids) {
set<websocketsession> sessions = sessions.get(id);
if (sessions != null) {
// 创建会话副本
list<websocketsession> sessionscopy = new arraylist<>(sessions);
totalsessions += sessionscopy.size();
for (websocketsession session : sessionscopy) {
if (validatesession(session)) {
allsuccess = false;
continue;
}
allsuccess &= sendmessage(session, message);
}
}
}
log.info("[广播完成] 总{}: 总设备{}: 成功{} ", ids.size(), totalsessions, allsuccess ? "全部" : "部分");
return allsuccess;
} finally {
main_lock.unlock();
}
}
/**
* @param session
* @param message
* @description 会话发送消息
* @throws
* @return boolean
* @date 2025-08-10 19:49:00
* @author wangkun
**/
private boolean sendmessage(websocketsession session, string message) {
try {
if (!session.isopen()) {
throw new illegalstateexception("会话已关闭");
}
// 会话级别的同步,锁,防止乱发
synchronized (session) {
session.sendmessage(new textmessage(message));
recordsuccessfulsend(session);
return true;
}
} catch (exception e) {
handlesendfailure(session, e);
return false;
}
}
/**
* @param session
* @description 发送记录 ,心跳更新状态
* @throws
* @return void
* @date 2025-08-10 19:50:41
* @author wangkun
**/
private void recordsuccessfulsend(websocketsession session) {
failure_counts.put(session, 0);
last_active_times.put(session, system.currenttimemillis());
}
/**
* @param session
* @param e
* @description 失败关闭连接
* @throws
* @return void
* @date 2025-08-10 19:51:10
* @author wangkun
**/
private void handlesendfailure(websocketsession session, exception e) {
main_lock.lock();
try {
int failures = failure_counts.getordefault(session, 0) + 1;
failure_counts.put(session, failures);
log.info("[发送失败] 会话{}: 次数{}: 原因{}: ", session.getid(), failures, e.getmessage());
if (failures >= 5) {
log.info("[自动清理] 达到失败上限 会话: {}", session.getid());
closesession(session);
}
} finally {
main_lock.unlock();
}
}
/**
* @param
* @description 心跳健康监控
* @throws
* @return void
* @date 2025-08-10 19:52:24
* @author wangkun
**/
private void checkheartbeats() {
main_lock.lock();
try {
long currenttime = system.currenttimemillis();
new arraylist<>(lastpongtimes.keyset()).foreach(session -> {
long lastpong = lastpongtimes.get(session);
if (lastpong != null && currenttime - lastpong > heartbeat_timeout) {
log.info("[心跳超时] 会话{}: 最后pong{}: 前 ", session.getid(), currenttime - lastpong);
closesession(session);
}
});
} finally {
main_lock.unlock();
}
}
/**
* @param session
* @description 检验session
* @throws
* @return boolean
* @date 2025-08-10 19:53:35
* @author wangkun
**/
private boolean validatesession(websocketsession session) {
if (!session.isopen()) {
removetosession(session);
return true;
}
return false;
}
/**
* @param session
* @description 关闭session, 会话结束
* @throws
* @return void
* @date 2025-08-10 19:54:07
* @author wangkun
**/
private void closesession(websocketsession session) {
try {
if (session.isopen()) {
session.close(closestatus.session_not_reliable);
}
} catch (exception e) {
log.info("[关闭异常] 会话{}: 原因{}: ", session.getid(), e.getmessage());
} finally {
removetosession(session);
}
}
/**
* @param
* @description 清理无效会话
* @throws
* @return void
* @date 2025-08-10 19:55:08
* @author wangkun
**/
private void cleanexpiredsessions() {
main_lock.lock();
try {
new arraylist<>(session_map.keyset()).foreach(session -> {
if (!session.isopen()) {
removetosession(session);
}
});
} finally {
main_lock.unlock();
}
}
}websocket工具类封装
(springutils就是一个自己封装获取spring上下文的工具)
/**
* @description websocket工具类
* @author wangkun
* @date 2025/8/11 10:08
* @version
*/
@component
public class websocketutils {
/**
* websocket处理器资源
**/
private static final websocketmanager web_socket_manager = springutils.getbean(websocketmanager.class);
/**
* @param groupid
* @param id
* @description 添加至分组
* @throws
* @return boolean
* @date 2025-08-11 10:14:27
* @author wangkun
**/
public static boolean addtogroup(string id, string groupid) {
return web_socket_manager.addtogroup(id, groupid);
}
/**
* @param id
* @param groupid
* @description 移除分组
* @throws
* @return boolean
* @date 2025-08-11 10:44:47
* @author wangkun
**/
public static boolean removetogroup(string id, string groupid) {
return web_socket_manager.removetogroup(id, groupid);
}
/**
* @param id
* @param message
* @description 指定单发消息
* @throws
* @return boolean
* @date 2025-08-11 11:03:14
* @author wangkun
**/
public static boolean sendmessage(string id, string message) {
return web_socket_manager.sendmessages(id, message);
}
/**
* @param groupid
* @param message
* @description 指定分组发消息
* @throws
* @return boolean
* @date 2025-08-11 11:09:22
* @author wangkun
**/
public static boolean sendgroupmessages(string groupid, string message) {
return web_socket_manager.sendgroupmessages(groupid, message);
}
/**
* @param message
* @description 全局广播发消息
* @throws
* @return boolean
* @date 2025-08-11 11:16:54
* @author wangkun
**/
public static boolean sendglobalmessages(string message) {
return web_socket_manager.sendglobalmessages(message);
}
}
websocket控制器
/**
* @description websocket控制器
* @author wangkun
* @date 2025/8/10 19:59
* @version
*/
@restcontroller
@requestmapping("/api/websocket")
public class websocketcontroller {
private final websocketmanager manager;
/**
* @param manager
* @description 构造函数,通过依赖注入方式初始化websocket管理器
* @throws
* @return
* @date 2025-08-21 09:25:44
* @author wangkun
**/
public websocketcontroller(websocketmanager manager) {
this.manager = manager;
}
/**
* @param
* @description 测试连接地址
* @throws
* @return java.lang.string
* @date 2025-08-10 19:59:26
* @author wangkun
**/
@getmapping("/test") // 处理get请求,用于测试websocket连接
public string testconnection() {
websocketutils.addtogroup("test", "test");
websocketutils.sendglobalmessages("压测");
return "ws://localhost:8099/ws/{id}";
}
/**
* @param id 用户id
* @param groupid 分组id
* @description 添加到分组
* @throws
* @return java.lang.string
* @date 2025-08-10 19:59:42
* @author wangkun
**/
@postmapping("/addtogroup")
public string addtogroup(@requestparam string id, @requestparam string groupid) {
boolean flag = manager.addtogroup(id, groupid);
return "[" + id + "]添加到分组[" + groupid + "]:" + flag;
}
/**
* @param id 用户id
* @param groupid 分组id
* @description 从分组移除
* @throws
* @return java.lang.string
* @date 2025-08-10 19:59:48
* @author wangkun
**/
@postmapping("/removetogroup")
public string removetogroup(@requestparam string id, @requestparam string groupid) {
boolean flag = manager.removetogroup(id, groupid);
return "[" + id + "]从分组[" + groupid + "]移除:" + flag;
}
/**
* @param id 用户id
* @param message 要发送的消息内容
* @description 发送消息
* @throws
* @return java.lang.string
* @date 2025-08-10 19:59:56
* @author wangkun
**/
@postmapping("/sendmessages")
public string sendmessages(@requestparam string id, @requestparam string message) {
boolean success = manager.sendmessages(id, message);
return success ? "发送成功" : "发送失败(无活跃连接)";
}
/**
* @param groupid 分组id
* @param message 要发送的消息内容
* @description 给分组发送消息
* @throws
* @return java.lang.string
* @date 2025-08-10 20:00:06
* @author wangkun
**/
@postmapping("/sendgroupmessages")
public string sendtogroup(@requestparam string groupid, @requestparam string message) {
boolean success = manager.sendgroupmessages(groupid, message);
return success ? "发送成功" : "发送失败(分组不存在或无成员)";
}
/**
* @param message 要广播的消息内容
* @description 全局广播
* @throws
* @return java.lang.string
* @date 2025-08-10 20:00:13
* @author wangkun
**/
@postmapping("/sendglobalmessages")
public string broadcast(@requestparam string message) {
boolean success = manager.sendglobalmessages(message);
return success ? "广播成功" : "部分发送失败";
}
}测试地址
ws://localhost:8099/ws/user1

剩下的测试地址都在控制中。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论