当前位置: 代码网 > it编程>编程语言>Java > SpringBoot WebSocket多消息推送过程

SpringBoot WebSocket多消息推送过程

2026年01月13日 Java 我要评论
实现功能1. 给某个分组推送消息2. 给所有分组推送消息3. 给所有用户推送消息4. 给某一个用户单独推送消息(在分组中)5. 给某一个用户推送消息(不在在分组中)6. 用户可能存在多个分组7. 用户

实现功能

  • 1. 给某个分组推送消息
  • 2. 给所有分组推送消息
  • 3. 给所有用户推送消息
  • 4. 给某一个用户单独推送消息(在分组中)
  • 5. 给某一个用户推送消息(不在在分组中)
  • 6. 用户可能存在多个分组
  • 7. 用户多设备登录
  • 8. 监控连接心跳(后端实现不需要前端实现配合)
  • 9. 失败超过5次关闭连接

使用公平锁支持并发。

添加依赖

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-websocket -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-websocket</artifactid>
        </dependency>

websocket配置类

/**
 * @description websocket配置类
 * @author wangkun
 * @date 2025/8/10 19:02
 * @version
 */
@configuration
@enablewebsocket
public class websocketconfig implements websocketconfigurer {

    /**
     * @description websocket拦截器配置端点
      * @param registry websocket处理程序注册表,用于注册websocket处理器和拦截器
     * @throws 可能抛出websocket相关的异常
     * @return void 无返回值
     * @date 2025-08-10 19:14:42
     * @author wangkun
     **/
    @override
    public void registerwebsockethandlers(websockethandlerregistry registry) {
        // 注册websocket处理器,并设置url路径为"/ws/{id}"
        registry.addhandler(websockethandler(), "/ws/{id}")
                // 允许所有来源的跨域请求
                .setallowedorigins("*");
    }

    /**
     * @description websocket核心处理器
     * @throws 可能抛出bean创建相关的异常
     * @return jwebsocketmanager 返回websocket管理器的实例
     * @date 2025-08-10 19:13:48
     * @author wangkun
     **/
    @bean
    public websocketmanager websocketmanager() {
        return new websocketmanager();
    }

    /**
     * @description websocket拦截处理
     * @throws 可能抛出bean创建相关的异常
     * @return jwebsockethandler 返回websocket处理器的实例
     * @date 2025-08-10 19:15:20
     * @author wangkun
     **/
    @bean // 将此方法返回的对象注册为spring容器中的bean
    public websockethandler websockethandler() {
        // 创建websocket处理器实例,并注入websocket管理器
        return new websockethandler(websocketmanager());
    }
}

websocket拦截器

/**
 * @description websocket拦截器
 * @author wangkun
 * @date 2025/8/10 19:14
 * @version
 */
@slf4j
public class websockethandler extends textwebsockethandler {

    private final websocketmanager manager; // websocket管理器,用于管理websocket会话

    /**
     * @description 构造器注入核心
      * @param manager websocket管理器实例
     * @throws
     * @return
     * @date 2025-08-21 09:27:35
     * @author wangkun
     **/
    public websockethandler(websocketmanager manager) {
        this.manager = manager;
    }

    /**
     * @param session websocket会话对象
     * @description 初始化会话
     * @throws 当会话建立过程中出现异常时抛出
     * @return void
     * @date 2025-08-10 19:17:57
     * @author wangkun
     **/
    @override
    public void afterconnectionestablished(websocketsession session) {
        manager.addsession(extractid(session), session);
    }

    /**
     * @param session websocket会话对象
     * @param message 接收到的文本消息
     * @description pong消息处理
     * @throws 当消息处理过程中出现异常时抛出
     * @return void
     * @date 2025-08-10 19:18:01
     * @author wangkun
     **/
    @override
    protected void handletextmessage(websocketsession session, textmessage message) {
        string payload = message.getpayload();
        log.info("收到来自[{}]的pong消息: {}", extractid(session), payload);
        if ("pong".equals(payload)) {
            manager.handlepongmessage(session);
        }
    }

    /**
     * @param session websocket会话对象
     * @param message 接收到的pong消息
     * @description 消息处理
     * @throws 当消息处理过程中出现异常时抛出
     * @return void
     * @date 2025-08-10 19:18:08
     * @author wangkun
     **/
    @override
    public void handlepongmessage(websocketsession session, pongmessage message) {
        manager.handlepongmessage(session);
    }

    /**
     * @param session websocket会话对象
     * @param status 会话关闭状态
     * @description 会话关闭
     * @throws 当会话关闭过程中出现异常时抛出
     * @return void
     * @date 2025-08-10 19:18:13
     * @author wangkun
     **/
    @override
    public void afterconnectionclosed(websocketsession session, closestatus status) {
        manager.removetosession(session);
    }

    /**
     * @param session websocket会话对象
     * @description 拦截请求参数处理
     * @throws 当uri解析过程中出现异常时抛出
     * @return java.lang.string 返回从uri中提取的id
     * @date 2025-08-10 19:18:22
     * @author wangkun
     **/
    private string extractid(websocketsession session) {
        // 从uri路径提取id:/ws/{id}
        string path = session.geturi().getpath();
        return path.substring(path.lastindexof('/') + 1);
    }
}

websocket处理器

/**
 * @description websocket处理器
 * @author wangkun
 * @date 2025/8/10 19:20
 * @version
 */
@slf4j
public class websocketmanager {

    // id -> 设备会话集合 (支持多设备)
    private final concurrentmap<string, set<websocketsession>> sessions = new concurrenthashmap<>();
    // 会话 -> id
    private final concurrentmap<websocketsession, string> session_map = new concurrenthashmap<>();
    // 分组id -> id集合
    private final concurrentmap<string, set<string>> groups = new concurrenthashmap<>();
    // 会话失败计数器 (session -> 失败次数)
    private final concurrentmap<websocketsession, integer> failure_counts = new concurrenthashmap<>();
    // 心跳状态监控 (session -> 最后活跃时间)
    private final concurrentmap<websocketsession, long> last_active_times = new concurrenthashmap<>();
    // 公平锁,确保所有操作的原子性
    private final reentrantlock main_lock = new reentrantlock(true);
    // 心跳状态监控
    private final concurrentmap<websocketsession, long> lastpongtimes = new concurrenthashmap<>();
    private final concurrentmap<websocketsession, scheduledfuture<?>> pingtasks = new concurrenthashmap<>();
    // 心跳间隔25秒
    private static final long heartbeat_interval = 25000;
    // 心跳超时40秒
    private static final long heartbeat_timeout = 40000;
    // 心跳ping nio包装
    private static final bytebuffer ping_payload = bytebuffer.wrap(new byte[]{0x1});

    // 线程池
    private final scheduledexecutorservice scheduler = executors.newscheduledthreadpool(4);

    public websocketmanager() {
        // 心跳检测任务
        scheduler.scheduleatfixedrate(this::checkheartbeats, 10, 10, timeunit.seconds);
        // 会话清理任务
        scheduler.scheduleatfixedrate(this::cleanexpiredsessions, 5, 5, timeunit.minutes);
    }

    /**
     * @param id
     * @param session
     * @description 建立连接添加到session
     * @throws
     * @return void
     * @date 2025-08-10 19:37:14
     * @author wangkun
     **/
    public void addsession(string id, websocketsession session) {
        main_lock.lock();
        try {
            if (!session_map.containskey(session)) {
                // 会话管理
                sessions.computeifabsent(id, k -> concurrenthashmap.newkeyset()).add(session);
                session_map.put(session, id);
                // 初始化心跳状态
                lastpongtimes.put(session, system.currenttimemillis());
                startpingtask(session);
                log.info("[连接建立] {}: 会话:{}", id, session.getid());
            }
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param session
     * @description 移除session中的
     * @throws
     * @return void
     * @date 2025-08-10 19:40:11
     * @author wangkun
     **/
    public void removetosession(websocketsession session) {
        main_lock.lock();
        try {
            string id = session_map.get(session);
            if (id != null) {
                // 清理会话
                set<websocketsession> sessions = sessions.get(id);
                if (sessions != null) {
                    sessions.remove(session);
                    if (sessions.isempty()) {
                        sessions.remove(id);
                    }
                }
                // 清理心跳任务
                stoppingtask(session);
                lastpongtimes.remove(session);
                session_map.remove(session);

                log.info("[连接关闭] {}:  会话{}: ", id, session.getid());
            }
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param session
     * @description 开始心跳ping
     * @throws
     * @return void
     * @date 2025-08-10 19:41:06
     * @author wangkun
     **/
    private void startpingtask(websocketsession session) {
        scheduledfuture<?> task = scheduler.scheduleatfixedrate(() -> {
            main_lock.lock();
            try {
                if (session.isopen()) {
                    try {
                        // 标准websocket ping发送
                        if (session instanceof standardwebsocketsession) {
                            ((standardwebsocketsession) session).getnativesession()
                                    .getasyncremote().sendping(ping_payload);
                        } else {
                            session.sendmessage(new pingmessage(ping_payload));
                        }
                        log.info("[ping发送] 会话{}: ", session.getid());
                    } catch (exception e) {
                        log.info("[ping失败] 会话{}: 错误{}: {}", session.getid(), e.getmessage(), session.geturi());
                        closesession(session);
                    }
                }
            } finally {
                main_lock.unlock();
            }
        }, heartbeat_interval, heartbeat_interval, timeunit.milliseconds);
        pingtasks.put(session, task);
    }

    /**
     * @param session
     * @description 心跳停止
     * @throws
     * @return void
     * @date 2025-08-10 19:41:55
     * @author wangkun
     **/
    private void stoppingtask(websocketsession session) {
        scheduledfuture<?> task = pingtasks.remove(session);
        if (task != null) {
            task.cancel(true);
        }
    }

    /**
     * @param session
     * @description 心跳pong接收
     * @throws
     * @return void
     * @date 2025-08-10 19:42:20
     * @author wangkun
     **/
    public void handlepongmessage(websocketsession session) {
        main_lock.lock();
        try {
            lastpongtimes.put(session, system.currenttimemillis());
            log.info("[pong接收] 会话{}: {}", session.getid(), session.geturi());
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param id
     * @param groupid
     * @description 添加至分组
     * @throws
     * @return void
     * @date 2025-08-10 19:43:20
     * @author wangkun
     **/
    public boolean addtogroup(string id, string groupid) {
        main_lock.lock();
        if (stringutils.isblank(id) && stringutils.isempty(id)) {
            log.info("加入分组失败,id 为空");
            return false;
        }
        if (stringutils.isblank(groupid) && stringutils.isempty(groupid)) {
            log.info("加入分组失败,groupid 为空");
            return false;
        }
        try {
            boolean flag = groups.computeifabsent(groupid, k -> concurrenthashmap.newkeyset()).add(id);
            log.info("[加入分组] {}: → 分组{}:  (当前组内: {})", id, groupid, groups.get(groupid).size());
            return flag;
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param id
     * @param groupid
     * @description 从分组移除
     * @throws
     * @return void
     * @date 2025-08-10 19:44:10
     * @author wangkun
     **/
    public boolean removetogroup(string id, string groupid) {
        main_lock.lock();
        try {
            if (stringutils.isblank(id) && stringutils.isempty(id)) {
                log.info("移除分组失败,id 为空");
                return false;
            }
            if (stringutils.isblank(groupid) && stringutils.isempty(groupid)) {
                log.info("移除分组失败,groupid 为空");
                return false;
            }
            set<string> groups = groups.get(groupid);
            if (groups != null) {
                groups.remove(id);
                if (groups.isempty()) {
                    groups.remove(groupid);
                }
                log.info("[离开分组] {}:  ← 分组{}: ", id, groupid);
                return true;
            }
            return false;
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param id
     * @param message
     * @description 消息发送,多设备
     * @throws
     * @return boolean
     * @date 2025-08-10 19:44:48
     * @author wangkun
     **/
    public boolean sendmessages(string id, string message) {
        main_lock.lock();
        try {
            if (stringutils.isblank(id) && stringutils.isempty(id)) {
                log.info("消息发送失败,id 为空");
                return false;
            }
            set<websocketsession> sessions = sessions.get(id);
            if (sessions == null || sessions.isempty()) {
                log.info("[单发消息失败] {}: 无活跃会话", id);
                return false;
            }
            // 创建副本防止并发修改
            list<websocketsession> sessionscopy = new arraylist<>(sessions);
            boolean allsuccess = true;
            for (websocketsession session : sessionscopy) {
                if (validatesession(session)) {
                    allsuccess = false;
                    continue;
                }
                allsuccess &= sendmessage(session, message);
            }
            log.info("[单发消息完成] {}: 设备数{}:成功{} ", id, sessionscopy.size(), allsuccess ? "全部" : "部分");
            return allsuccess;
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param groupid
     * @param message
     * @description 按组发送
     * @throws
     * @return boolean
     * @date 2025-08-10 19:46:15
     * @author wangkun
     **/
    public boolean sendgroupmessages(string groupid, string message) {
        main_lock.lock();
        try {
            if (stringutils.isblank(groupid)&& stringutils.isempty(message)) {
                log.info("[按组发失败] groudid为空");
                return false;
            }
            set<string> ids = groups.get(groupid);
            if (ids == null || ids.isempty()) {
                log.info("[按组发失败] 分组{}: 无成员", groupid);
                return false;
            }
            boolean allsuccess = true;
            // 创建副本防止并发修改
            for (string id : new arraylist<>(ids)) {
                allsuccess &= sendmessages(id, message);
            }
            log.info("[组发完成] 分组{}:  成员数{}:  成功{}", groupid, ids.size(), allsuccess ? "全部" : "部分");
            return allsuccess;
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param message
     * @description 全局广播无论在不在分组
     * @throws
     * @return boolean
     * @date 2025-08-10 19:47:33
     * @author wangkun
     **/
    public boolean sendglobalmessages(string message) {
        main_lock.lock();
        try {
            if (sessions.isempty()) {
                log.info("[广播失败] 无活跃");
                return false;
            }
            boolean allsuccess = true;
            int totalsessions = 0;
            // 创建副本防止并发修改
            set<string> ids = new hashset<>(sessions.keyset());
            for (string id : ids) {
                set<websocketsession> sessions = sessions.get(id);
                if (sessions != null) {
                    // 创建会话副本
                    list<websocketsession> sessionscopy = new arraylist<>(sessions);
                    totalsessions += sessionscopy.size();
                    for (websocketsession session : sessionscopy) {
                        if (validatesession(session)) {
                            allsuccess = false;
                            continue;
                        }
                        allsuccess &= sendmessage(session, message);
                    }
                }
            }
            log.info("[广播完成] 总{}: 总设备{}: 成功{} ", ids.size(), totalsessions, allsuccess ? "全部" : "部分");
            return allsuccess;
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param session
     * @param message
     * @description 会话发送消息
     * @throws
     * @return boolean
     * @date 2025-08-10 19:49:00
     * @author wangkun
     **/
    private boolean sendmessage(websocketsession session, string message) {
        try {
            if (!session.isopen()) {
                throw new illegalstateexception("会话已关闭");
            }
            // 会话级别的同步,锁,防止乱发
            synchronized (session) {
                session.sendmessage(new textmessage(message));
                recordsuccessfulsend(session);
                return true;
            }
        } catch (exception e) {
            handlesendfailure(session, e);
            return false;
        }
    }

    /**
     * @param session
     * @description 发送记录 ,心跳更新状态
     * @throws
     * @return void
     * @date 2025-08-10 19:50:41
     * @author wangkun
     **/
    private void recordsuccessfulsend(websocketsession session) {
        failure_counts.put(session, 0);
        last_active_times.put(session, system.currenttimemillis());
    }

    /**
     * @param session
     * @param e
     * @description 失败关闭连接
     * @throws
     * @return void
     * @date 2025-08-10 19:51:10
     * @author wangkun
     **/
    private void handlesendfailure(websocketsession session, exception e) {
        main_lock.lock();
        try {
            int failures = failure_counts.getordefault(session, 0) + 1;
            failure_counts.put(session, failures);
            log.info("[发送失败] 会话{}: 次数{}:  原因{}: ", session.getid(), failures, e.getmessage());
            if (failures >= 5) {
                log.info("[自动清理] 达到失败上限 会话: {}", session.getid());
                closesession(session);
            }
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param
     * @description 心跳健康监控
     * @throws
     * @return void
     * @date 2025-08-10 19:52:24
     * @author wangkun
     **/
    private void checkheartbeats() {
        main_lock.lock();
        try {
            long currenttime = system.currenttimemillis();
            new arraylist<>(lastpongtimes.keyset()).foreach(session -> {
                long lastpong = lastpongtimes.get(session);
                if (lastpong != null && currenttime - lastpong > heartbeat_timeout) {
                    log.info("[心跳超时] 会话{}:  最后pong{}:  前 ", session.getid(), currenttime - lastpong);
                    closesession(session);
                }
            });
        } finally {
            main_lock.unlock();
        }
    }

    /**
     * @param session
     * @description 检验session
     * @throws
     * @return boolean
     * @date 2025-08-10 19:53:35
     * @author wangkun
     **/
    private boolean validatesession(websocketsession session) {
        if (!session.isopen()) {
            removetosession(session);
            return true;
        }
        return false;
    }

    /**
     * @param session
     * @description 关闭session, 会话结束
     * @throws
     * @return void
     * @date 2025-08-10 19:54:07
     * @author wangkun
     **/
    private void closesession(websocketsession session) {
        try {
            if (session.isopen()) {
                session.close(closestatus.session_not_reliable);
            }
        } catch (exception e) {
            log.info("[关闭异常] 会话{}:  原因{}: ", session.getid(), e.getmessage());
        } finally {
            removetosession(session);
        }
    }

    /**
     * @param
     * @description 清理无效会话
     * @throws
     * @return void
     * @date 2025-08-10 19:55:08
     * @author wangkun
     **/
    private void cleanexpiredsessions() {
        main_lock.lock();
        try {
            new arraylist<>(session_map.keyset()).foreach(session -> {
                if (!session.isopen()) {
                    removetosession(session);
                }
            });
        } finally {
            main_lock.unlock();
        }
    }

}

websocket工具类封装

(springutils就是一个自己封装获取spring上下文的工具)

/**
 * @description websocket工具类
 * @author wangkun
 * @date 2025/8/11 10:08
 * @version
 */
@component
public class websocketutils {

    /**
     * websocket处理器资源
     **/
    private static final websocketmanager web_socket_manager = springutils.getbean(websocketmanager.class);


    /**
     * @param groupid
     * @param id
     * @description 添加至分组
     * @throws
     * @return boolean
     * @date 2025-08-11 10:14:27
     * @author wangkun
     **/
    public static boolean addtogroup(string id, string groupid) {
        return web_socket_manager.addtogroup(id, groupid);
    }

    /**
     * @param id
     * @param groupid
     * @description 移除分组
     * @throws
     * @return boolean
     * @date 2025-08-11 10:44:47
     * @author wangkun
     **/
    public static boolean removetogroup(string id, string groupid) {
        return web_socket_manager.removetogroup(id, groupid);
    }

    /**
     * @param id
     * @param message
     * @description 指定单发消息
     * @throws
     * @return boolean
     * @date 2025-08-11 11:03:14
     * @author wangkun
     **/
    public static boolean sendmessage(string id, string message) {
        return web_socket_manager.sendmessages(id, message);
    }

    /**
     * @param groupid
     * @param message
     * @description 指定分组发消息
     * @throws
     * @return boolean
     * @date 2025-08-11 11:09:22
     * @author wangkun
     **/
    public static boolean sendgroupmessages(string groupid, string message) {
        return web_socket_manager.sendgroupmessages(groupid, message);
    }

    /**
     * @param message
     * @description 全局广播发消息
     * @throws
     * @return boolean
     * @date 2025-08-11 11:16:54
     * @author wangkun
     **/
    public static boolean sendglobalmessages(string message) {
        return web_socket_manager.sendglobalmessages(message);
    }
}

websocket控制器

/**
 * @description websocket控制器
 * @author wangkun
 * @date 2025/8/10 19:59
 * @version
 */
@restcontroller
@requestmapping("/api/websocket")
public class websocketcontroller {

    private final websocketmanager manager;

    /**
     * @param manager
     * @description 构造函数,通过依赖注入方式初始化websocket管理器
     * @throws
     * @return
     * @date 2025-08-21 09:25:44
     * @author wangkun
     **/
    public websocketcontroller(websocketmanager manager) {
        this.manager = manager;
    }

    /**
     * @param
     * @description 测试连接地址
     * @throws
     * @return java.lang.string
     * @date 2025-08-10 19:59:26
     * @author wangkun
     **/
    @getmapping("/test") // 处理get请求,用于测试websocket连接
    public string testconnection() {
        websocketutils.addtogroup("test", "test");
        websocketutils.sendglobalmessages("压测");
        return "ws://localhost:8099/ws/{id}";
    }

    /**
     * @param id      用户id
     * @param groupid 分组id
     * @description 添加到分组
     * @throws
     * @return java.lang.string
     * @date 2025-08-10 19:59:42
     * @author wangkun
     **/
    @postmapping("/addtogroup")
    public string addtogroup(@requestparam string id, @requestparam string groupid) {
        boolean flag = manager.addtogroup(id, groupid);
        return "[" + id + "]添加到分组[" + groupid + "]:" + flag;
    }

    /**
     * @param id      用户id
     * @param groupid 分组id
     * @description 从分组移除
     * @throws
     * @return java.lang.string
     * @date 2025-08-10 19:59:48
     * @author wangkun
     **/
    @postmapping("/removetogroup")
    public string removetogroup(@requestparam string id, @requestparam string groupid) {
        boolean flag = manager.removetogroup(id, groupid);
        return "[" + id + "]从分组[" + groupid + "]移除:" + flag;
    }

    /**
     * @param id      用户id
     * @param message 要发送的消息内容
     * @description 发送消息
     * @throws
     * @return java.lang.string
     * @date 2025-08-10 19:59:56
     * @author wangkun
     **/
    @postmapping("/sendmessages")
    public string sendmessages(@requestparam string id, @requestparam string message) {
        boolean success = manager.sendmessages(id, message);
        return success ? "发送成功" : "发送失败(无活跃连接)";
    }

    /**
     * @param groupid 分组id
     * @param message 要发送的消息内容
     * @description 给分组发送消息
     * @throws
     * @return java.lang.string
     * @date 2025-08-10 20:00:06
     * @author wangkun
     **/
    @postmapping("/sendgroupmessages")
    public string sendtogroup(@requestparam string groupid, @requestparam string message) {
        boolean success = manager.sendgroupmessages(groupid, message);
        return success ? "发送成功" : "发送失败(分组不存在或无成员)";
    }

    /**
     * @param message 要广播的消息内容
     * @description 全局广播
     * @throws
     * @return java.lang.string
     * @date 2025-08-10 20:00:13
     * @author wangkun
     **/
    @postmapping("/sendglobalmessages")
    public string broadcast(@requestparam string message) {
        boolean success = manager.sendglobalmessages(message);
        return success ? "广播成功" : "部分发送失败";
    }
}

测试地址

ws://localhost:8099/ws/user1

剩下的测试地址都在控制中。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com