当前位置: 代码网 > it编程>编程语言>Java > Springboot集成websocket

Springboot集成websocket

2024年08月01日 Java 我要评论
Springboot集成websocket

springboot集成websocket

1. websocket概述

  • 全双工通讯模式允许双方同时进行双向通讯,如手机通话。
  • 半双工通讯模式允许双方交替发送和接收信息,但不能同时通讯,如对讲机。
  • 单工通讯模式只能单向传输信息,不能回复,如广播电台。

2. websocket原理


websocket协议的核心特点包括:

  • 全双工通信:客户端和服务器可以同时发送和接收消息。
  • 持久连接:一旦建立连接,就可以持续进行数据交换,无需像http那样频繁地建立新的连接。
  • 低延迟:由于连接是持久的,数据可以几乎实时地发送和接收。
  • 轻量级协议:websocket协议的头部信息非常简单,减少了数据传输的开销。

3. spring boot集成websocket

在spring boot中集成websocket非常简单,spring提供了对websocket的原生支持。以下是一个基本的集成步骤:

3.1 添加依赖

pom.xml中添加spring boot的websocket依赖:

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-websocket</artifactid>
</dependency>

3.2 创建websocket配置类

创建一个配置类来启用和配置websocket:

package com.jiayuan.common.config;

import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.web.socket.server.standard.serverendpointexporter;
import org.springframework.web.socket.server.standard.servletservercontainerfactorybean;

/**
 * @title: websocketconfig
 * @package: com.jiayuan.common.config
 * @description: websocket配置
 * @author: xmc
 * @date: 创建时间 2024-04-24
 */
@configuration
public class websocketconfig {

    /**
     * 自动注册使用了@serverendpoint注解声明的websocket endpoint
     *
     * @return
     */

    @bean
    public serverendpointexporter serverendpointexporter() {
        return new serverendpointexporter();
    }

    /**
     * 通信文本消息和二进制缓存区大小
     * 避免对接 第三方 报文过大时,websocket 1009 错误
     *
     * @return
     */

    @bean
    public servletservercontainerfactorybean createwebsocketcontainer() {
        servletservercontainerfactorybean container = new servletservercontainerfactorybean();
        // 在此处设置buffersize
        container.setmaxtextmessagebuffersize(10240000);
        container.setmaxbinarymessagebuffersize(10240000);
        container.setmaxsessionidletimeout(15 * 60000l);
        return container;
    }
}
 

3.3 创建消息处理器

package com.jiayuan.common.config;

import cn.hutool.core.collection.collectionutil;
import cn.hutool.extra.spring.springutil;
import cn.hutool.json.jsonutil;
import com.alibaba.fastjson2.json;
import com.alibaba.fastjson2.jsonobject;
import com.baomidou.mybatisplus.core.toolkit.stringutils;
import com.jiayuan.common.redis.rediscache;
import com.jiayuan.modules.critical.dto.cvrecordextradto;
import com.jiayuan.modules.critical.dto.syncrecorddto;
import com.jiayuan.modules.critical.service.cvrecordservice;
import lombok.requiredargsconstructor;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

import javax.annotation.resource;
import javax.websocket.*;
import javax.websocket.server.pathparam;
import javax.websocket.server.serverendpoint;
import java.io.ioexception;
import java.util.list;
import java.util.concurrent.concurrenthashmap;

/**
 * @title: websocketserver
 * @package: com.jiayuan.common.config
 * @description: websocket的服务端
 * @author: xmc
 * @date: 创建时间 2024-04-24
 */
@component
@slf4j
@serverendpoint("/api/pushmessage/{userid}")
public class websocketserver {

    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlinecount = 0;
    /**
     * concurrent包的线程安全set,用来存放每个客户端对应的websocket对象。
     */
    private static concurrenthashmap<string, websocketserver> websocketmap = new concurrenthashmap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private session session;
    /**
     * 接收userid
     */
    private string userid = "";

    /**
     * 连接建立成
     * 功调用的方法
     */
    @onopen
    public void onopen(session session, @pathparam("userid") string userid) {
        this.session = session;
        this.userid = userid;
        if (websocketmap.containskey(userid)) {
            websocketmap.remove(userid);
            //加入set中
            websocketmap.put(userid, this);
        } else {
            //加入set中
            websocketmap.put(userid, this);
            //在线数加1
            addonlinecount();
        }
        log.info("用户连接:" + userid + ",当前在线人数为:" + getonlinecount());
        sendmessage("连接成功");
    }

    /**
     * 连接关闭
     * 调用的方法
     */
    @onclose
    public void onclose() {
        if (websocketmap.containskey(userid)) {
            websocketmap.remove(userid);
            //从set中删除
            subonlinecount();
        }
        log.info("用户退出:" + userid + ",当前在线人数为:" + getonlinecount());
    }

    /**
     * 收到客户端消
     * 息后调用的方法
     *
     * @param message 客户端发送过来的消息
     **/
    @onmessage
    public void onmessage(string message, session session) {
        log.info("用户消息:" + userid + ",报文:" + message);
        //可以群发消息
        //消息保存到数据库、redis
        if (stringutils.isnotblank(message)) {
            try {
                //解析发送的报文
                jsonobject jsonobject = json.parseobject(message);
                //追加发送人(防止串改)
                jsonobject.put("fromuserid", this.userid);
                string touserid = jsonobject.getstring("touserid");
                //传送给对应touserid用户的websocket
                if (stringutils.isnotblank(touserid) && websocketmap.containskey(touserid)) {
                    websocketmap.get(touserid).sendmessage(message);
                } else {
                    //否则不在这个服务器上,发送到mysql或者redis
                    log.error("请求的userid:" + touserid + "不在该服务器上");
                }
            } catch (exception e) {
                e.printstacktrace();
            }
        }
    }


    /**
     * @param session
     * @param error
     */
    @onerror
    public void onerror(session session, throwable error) {

        log.error("用户错误:" + this.userid + ",原因:" + error.getmessage());
        error.printstacktrace();
    }

    /**
     * 实现服务
     * 器主动推送
     */
    public void sendmessage(string message) {
        try {
            this.session.getbasicremote().sendtext(message);
        } catch (ioexception e) {
            e.printstacktrace();
        }
    }

    /**
     * 发送自定
     * 义消息
     **/
    public static void sendinfo(string message, string userid) {
        log.info("发送消息到:" + userid + ",报文:" + message);
        if (stringutils.isnotblank(userid) && websocketmap.containskey(userid)) {
            websocketmap.get(userid).sendmessage(message);
        } else {
            log.error("用户" + userid + ",不在线!");
        }
    }

    /**
     * 获得此时的
     * 在线人数
     *
     * @return
     */
    public static synchronized int getonlinecount() {
        return onlinecount;
    }

    /**
     * 在线人
     * 数加1
     */
    public static synchronized void addonlinecount() {
        websocketserver.onlinecount++;
    }

    /**
     * 在线人
     * 数减1
     */
    public static synchronized void subonlinecount() {
        websocketserver.onlinecount--;
    }

}
 

上述代码可能会有疑问:sessionuserid两个字段是不是安全的?多人连接,后面的会不会覆盖掉前面sessionuserid

答案是不会的,关于处理器websocketserver我们要明确以下几点:

3.4 服务器主动给客户端发送消息

 websocketserver.sendinfo("服务器主动给客户端发送消息test", "zhangsan");

4. 使用apipost测试websocket

以下是如何使用apipost进行测试的步骤:

  1. 新建一个websocket测试

  1. 填写urlws://localhost:8080/cvms-api/api/pushmessage/3

注意以下几点:

  • 协议是ws,加密方式请选择wss
  • 选择raw
  • url的拼接公式如下
# servlet.context-path 这个是application.yml中的配置
ws://ip:port//${servlet.context-path}/注解@serverendpoint的值
  • 有权限验证的,比如说shiro权限验证,url就需要加入白名单
 filtermap.put("/api/pushmessage/*", "anon");

5. 参考和感谢

springboot2.0集成websocket,实现后台向前端推送信息

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com