当前位置: 代码网 > it编程>编程语言>Java > WebSocket(java版)服务核心实例代码

WebSocket(java版)服务核心实例代码

2025年12月21日 Java 我要评论
说明:这是一个使用 java jdk 8 和 spring boot 实现的websocket演示项目。目的是为解决多端消息通讯的问题。websocket 是一种基于 tcp 的全双工通信协议,核心作

说明:

这是一个使用 java jdk 8 和 spring boot 实现的websocket演示项目。目的是为解决多端消息通讯的问题。

websocket 是一种基于 tcp 的全双工通信协议,核心作用是解决传统 http 协议 “请求 - 响应” 模式的局限性,实现 客户端与服务器之间的实时、双向、低延迟数据传输

源码地址:https://gitee.com/lqh4188/web-socket

一、功能介绍

功能特性:

  • 基于 maven 的 spring boot 项目骨架。
  • 纯 websocket 端点 /ws ,支持用户隔离,http:使用ws,https:使用wss。
  • 支持分片设置和缓冲区大小设置,解决传输内容限制
  • 提供静态测试页面 index.html ,用于连接、发送消息、查看消息。

项目结构:

  • pom.xml :spring boot 3.3,依赖 spring-boot-starter-web 和 spring-boot-starter-websocket 。
  • src/main/java/com/example/websocket/websocketapplication.java :应用入口。
  • src/main/java/com/example/websocket/websocketconfig.java :注册 websocket 处理器,端点为 /ws 。
  • src/main/java/com/example/websocket/chatwebsockethandler.java :文本消息处理,广播到所有会话。
  • src/main/resources/static/index.html :页面内置 js,连接 ws://{host}/ws ,可发送、显示消息。

关键代码位置

  • 启动类: src/main/java/com/example/websocket/websocketapplication.java:1
  • websocket 配置: src/main/java/com/example/websocket/websocketconfig.java:1
  • 文本消息处理器: src/main/java/com/example/websocket/chatwebsockethandler.java:1
  • 静态页面: src/main/resources/static/index.html:1

测试连接

  • 打开 http://localhost:8800 ,使用页面上的“连接/发送”测试
  • websocket 地址: ws://localhost:8080/ws

二、运行测试

可通过userid来创建独立的联接,进行用户隔离

三、核心代码说明

由于websocket对传输的内容有限制,若内容较大可进行缓冲区大小设置,并对不同文本进行分片处理

chatwebsockethandler.java代码:

package com.example.websocket;
import java.io.bytearrayoutputstream;
import java.net.uri;
import java.nio.bytebuffer;
import java.nio.charset.standardcharsets;
import java.util.map;
import java.util.set;
import java.util.concurrent.concurrenthashmap;

import org.springframework.web.socket.binarymessage;
import org.springframework.web.socket.closestatus;
import org.springframework.web.socket.textmessage;
import org.springframework.web.socket.websocketsession;
import org.springframework.web.socket.handler.abstractwebsockethandler;
import com.fasterxml.jackson.databind.objectmapper;

public class chatwebsockethandler extends abstractwebsockethandler {
    private final concurrenthashmap<string, set<websocketsession>> usersessions = new concurrenthashmap<>();
    private static final objectmapper mapper = new objectmapper();
    private final concurrenthashmap<string, stringbuilder> textfragments = new concurrenthashmap<>();
    private final concurrenthashmap<string, bytearrayoutputstream> binaryfragments = new concurrenthashmap<>();

    @override
    public void afterconnectionestablished(websocketsession session) throws exception {
        // 验证用户id的有效性
        string uid = resolveuserid(session);
        if (uid == null || uid.isempty()) {
            session.close(closestatus.bad_data);
            return;
        }
        session.getattributes().put("userid", uid);
        //多会话管理
        usersessions.computeifabsent(uid, k -> concurrenthashmap.newkeyset()).add(session);
    }

    @override
    protected void handletextmessage(websocketsession session, textmessage message) throws exception {
        // 分片处理
        string id = session.getid();
        if (!message.islast()) {
            textfragments.computeifabsent(id, k -> new stringbuilder()).append(message.getpayload());
            return;
        }
        stringbuilder sb = textfragments.remove(id);
        string payload = sb != null ? sb.append(message.getpayload()).tostring() : message.getpayload();
        routepayload(session, payload);
    }

    @override
    protected void handlebinarymessage(websocketsession session, binarymessage message) throws exception {
        //二进制消息处理
        string id = session.getid();
        bytebuffer buf = message.getpayload();
        byte[] chunk = new byte[buf.remaining()];
        buf.get(chunk);
        bytearrayoutputstream acc = binaryfragments.computeifabsent(id, k -> new bytearrayoutputstream());
        acc.write(chunk);
        if (message.islast()) {
            byte[] all = acc.tobytearray();
            binaryfragments.remove(id);
            string payload = new string(all, standardcharsets.utf_8);
            routepayload(session, payload);
        }
    }

    @override
    public boolean supportspartialmessages() {
        return true;
    }

    @override
    public void afterconnectionclosed(websocketsession session, closestatus status) throws exception {
        //  websocket 连接关闭时的清理逻辑
        object v = session.getattributes().get("userid");
        if (v == null) return;
        string uid = string.valueof(v);
        set<websocketsession> set = usersessions.get(uid);
        if (set != null) {
            set.remove(session);
            if (set.isempty()) usersessions.remove(uid);
        }
    }

    /** 从 websocket 连接的 url 查询参数中提取用户id */
    private string resolveuserid(websocketsession session) {
        uri uri = session.geturi();
        if (uri == null) return null;
        string q = uri.getquery();
        if (q == null || q.isempty()) return null;
        string[] parts = q.split("&");
        for (string p : parts) {
            int i = p.indexof('=');
            if (i > 0) {
                string k = p.substring(0, i);
                string val = p.substring(i + 1);
                if ("userid".equals(k)) return val;
            }
        }
        return null;
    }

    private void routepayload(websocketsession session, string payload) throws exception {
        object v = session.getattributes().get("userid");
        if (v == null) return;
        string fromuid = string.valueof(v);

        // 解析消息
        message message = new message();
        message.setfromuserid(fromuid);
        
        try {
            // 尝试将payload解析为message对象
            message receivedmsg = mapper.readvalue(payload, message.class);
            message.settouserid(receivedmsg.gettouserid());
            message.setcontent(receivedmsg.getcontent());
            message.settype(receivedmsg.gettype());
        } catch (exception e) {
            // 如果解析失败,将整个payload作为content
            message.setcontent(payload);
        }

        string touid = message.gettouserid();
        boolean isp2p = touid != null && !touid.isempty();
        
        set<websocketsession> targets;
        if (isp2p) {
            targets = usersessions.get(touid);
        } else {
            targets = usersessions.get(fromuid);
        }
        
        // 序列化消息对象
        string outstr = mapper.writevalueasstring(message);
        textmessage msg = new textmessage(outstr);
        
        if (targets == null || targets.isempty()) {
            if (session.isopen()) {
                session.sendmessage(msg);
            }
            return;
        }

        for (websocketsession s : targets) {
            if (s.isopen()) {
                s.sendmessage(msg);
            }
        }
        if (isp2p && session.isopen()) {
            session.sendmessage(msg);
        }
    }
}

配置类websocketconfig.java

package com.example.websocket;

import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.web.socket.websockethandler;
import org.springframework.web.socket.config.annotation.enablewebsocket;
import org.springframework.web.socket.config.annotation.websocketconfigurer;
import org.springframework.web.socket.config.annotation.websockethandlerregistry;
import org.springframework.web.socket.server.standard.servletservercontainerfactorybean;

@configuration
@enablewebsocket
public class websocketconfig implements websocketconfigurer {
    @override
    public void registerwebsockethandlers(websockethandlerregistry registry) {
        registry.addhandler(chathandler(), "/ws").setallowedoriginpatterns("*");
    }

    @bean
    public websockethandler chathandler() {
        return new chatwebsockethandler();
    }

    // 配置 websocket 容器参数(解决消息过大、超时等问题)
    @bean
    public servletservercontainerfactorybean createwebsocketcontainer() {
        servletservercontainerfactorybean container = new servletservercontainerfactorybean();
        // 文本消息缓冲区:2mb(解决解码后消息过大的核心配置)
        container.setmaxtextmessagebuffersize(2 * 1024 * 1024);
        // 二进制消息缓冲区:4mb(按需配置)
        container.setmaxbinarymessagebuffersize(4 * 1024 * 1024);
        // 会话空闲超时:60秒(无交互则关闭连接)
        container.setmaxsessionidletimeout(60_000l);
        return container;
    }
}

总结 

到此这篇关于websocket(java版)服务核心代码的文章就介绍到这了,更多相关java websocket服务内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com