当前位置: 代码网 > it编程>编程语言>Java > SpringCloud整合Netty集群实现WebSocket的示例代码

SpringCloud整合Netty集群实现WebSocket的示例代码

2024年11月26日 Java 我要评论
引言在分布式系统中,spring cloud 为微服务架构提供了丰富的功能,而 netty 是一个高性能的网络通信框架。将二者结合实现 socket 集群,可以满足大规模、高性能网络通信的需求,实现前

引言

在分布式系统中,spring cloud 为微服务架构提供了丰富的功能,而 netty 是一个高性能的网络通信框架。将二者结合实现 socket 集群,可以满足大规模、高性能网络通信的需求,实现前后端间高效稳定的通信。

1. 服务注册和发现中心

这里服务注册和发行中心使用nacos为例(需要启动一个nacos服务器)。

微服务注册: 在每一个微服务项目中,添加nacos客户端连接,并在配置文件中指定服务名称和端口。例如:

# tomcat
server:
  port: 9201
  netty:
    port: 10201
    application:
      name: yhy-netty-server

# spring
spring:
  application:
    # 应用名称
    name: soc-dmoasp-system
  profiles:
    # 环境配置
    active: dev
  cloud:
    nacos:
      discovery:
        # 服务注册地址
        server-addr: nacos-registry:8858
      config:
        # 配置中心地址
        server-addr: nacos-registry:8858
        file-extension: yml
        # 共享配置
        shared-configs:
          - data-id: application.${spring.cloud.nacos.config.file-extension}
            refresh: true
          - data-id: soc-dmoasp-redission.${spring.cloud.nacos.config.file-extension}
          - data-id: soc-dmoasp-druid.${spring.cloud.nacos.config.file-extension}

这是一个基本的服务配置。里面关于netty的applicaiton.name和port可以通过nacos的namingservice类手动注册。

1.1. netty服务器搭建

  • 添加netty依赖:在具体微服务中的pom.xml中添加netty依赖:
<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-reactor-netty</artifactid>
</dependency>
  • netty启动类:创建一个nettyserver类,用于启动netty,示例如下:
import com.alibaba.cloud.nacos.nacosdiscoveryproperties;
import com.alibaba.nacos.api.propertykeyconst;
import com.alibaba.nacos.api.naming.namingfactory;
import com.alibaba.nacos.api.naming.namingservice;
import com.alibaba.nacos.api.naming.pojo.instance;
import com.soc.dmoasp.system.server.handler.websocketidlestatehandler;
import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.channelfuture;
import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.eventloopgroup;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.socketchannel;
import io.netty.channel.socket.nio.nioserversocketchannel;
import io.netty.handler.codec.http.httpobjectaggregator;
import io.netty.handler.codec.http.httpservercodec;
import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler;
import io.netty.handler.stream.chunkedwritehandler;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.boot.commandlinerunner;
import org.springframework.scheduling.annotation.async;
import org.springframework.stereotype.component;

import javax.annotation.predestroy;
import java.net.inetaddress;
import java.util.properties;

/**
 * netty服务
 * @author dsw
 * @date 2024/11/18 17:34
 */
@component
public class nettyserver implements commandlinerunner {

    logger log = loggerfactory.getlogger(nettyserver.class);

    @autowired
    private nacosdiscoveryproperties nacosdiscoveryproperties;


    @value("${server.netty.port}")
    private integer nettyport;
    @value("${server.netty.application.name}")
    private string nettyname;

    eventloopgroup bossgroup;

    eventloopgroup workgroup;

    @override
    public void run(string... args) throws exception {
        log.info("初始化netty配置开始");
        //netty 服务端启动的端口不可和springboot启动类的端口号重复
        this.start();
        //关闭服务器的时候同时关闭netty服务
        runtime.getruntime().addshutdownhook(new thread(() -> {
            try {
                this.destroy();
            } catch (interruptedexception e) {
                log.error(e.getmessage());
            }
        }));
    }

    @async
    public void start() throws interruptedexception {
        try {
            bossgroup = new nioeventloopgroup(1);
            workgroup = new nioeventloopgroup(10);
            serverbootstrap bootstrap = new serverbootstrap();
            // bossgroup辅助客户端的tcp连接请求, workgroup负责与客户端之前的读写操作
            bootstrap.group(bossgroup, workgroup)
            // 指定channel
            .channel(nioserversocketchannel.class)
            .childhandler(new channelinitializer<socketchannel>() {
                @override
                protected void initchannel(socketchannel socketchannel) throws exception {
                    channelpipeline pipeline = socketchannel.pipeline();
                            pipeline
                                    //http解码
                                    .addlast(new httpservercodec())
                                    .addlast(new chunkedwritehandler())
                                    //http段聚合
                                    .addlast(new httpobjectaggregator(1024*1024))
                                    //将http协议转成ws协议
                                    .addlast(new websocketserverprotocolhandler("/socket"))

                            ;

                        }
                    });
            registernamingservice(nettyname,nettyport);
            // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
            channelfuture future = bootstrap.bind(nettyport).sync();
            if (future.issuccess()) {
                log.info("server started and listen on:{}", future.channel().localaddress());
                log.info("启动 netty server");
            }
        } catch (interruptedexception e) {
            log.error("netty异常:{}", e.getmessage());
        }
    }
    /**
     * 将netty服务注册进nacos
     *
     * @param nettyname 服务名称
     * @param nettyport 服务端口号
     */
    private void registernamingservice(string nettyname, integer nettyport) {
        try {
            properties properties = new properties();
            properties.setproperty(propertykeyconst.server_addr, nacosdiscoveryproperties.getserveraddr());
            properties.setproperty(propertykeyconst.namespace, nacosdiscoveryproperties.getnamespace());
            properties.setproperty(propertykeyconst.username, nacosdiscoveryproperties.getusername());
            properties.setproperty(propertykeyconst.password, nacosdiscoveryproperties.getpassword());
            namingservice namingservice = namingfactory.createnamingservice(properties);
            inetaddress address = inetaddress.getlocalhost();
            // 定义服务实例信息
            instance instance = new instance();
            instance.setip(address.gethostaddress());
            instance.setport(nettyport);
            instance.setweight(1.0);
            instance.sethealthy(true);
            namingservice.registerinstance(nettyname, nacosdiscoveryproperties.getgroup(), instance);
        } catch (exception e) {
            throw new runtimeexception(e);
        }
    }

    /**
     * 释放资源
     */
    @predestroy
    public void destroy() throws interruptedexception {
        if (bossgroup != null) {
            bossgroup.shutdowngracefully().sync();
        }
        if (workgroup != null) {
            workgroup.shutdowngracefully().sync();
        }
        log.info("关闭netty");

    }

}

使用commandlinerunner接口实现run方法在启动项目的时候把netty服务带起。

bossgroup workgroup 的角色区别

  • bossgroup(老板组)

    主要职责是负责监听服务器端的端口,等待新的客户端连接请求到来。它就像是公司里负责接待新客户的前台人员,当有新客户(客户端)想要连接到服务器时,bossgroup中的eventloop会接收到这个连接请求。

    一般情况下,bossgroup只需要配置较少数量的eventloop就可以满足需求,因为它主要处理的是连接建立的初期阶段,即接受新连接这个相对不那么频繁的操作(相比于后续处理大量数据传输等操作)。通常会设置为 1 个eventloop或者根据服务器的具体性能和预期的连接请求频率适当增加数量,但总体数量相对较少。

  • workgroup(工作组)

    一旦bossgroup接受了新的客户端连接,就会把这个新连接交给workgroup来进一步处理后续的所有与该连接相关的操作,比如读取客户端发送的数据、向客户端发送响应数据等。它就像是公司里负责具体为客户办理业务的工作人员。

    workgroup需要处理大量的实际业务数据传输和交互工作,所以通常会根据服务器的性能和预期要处理的并发连接数量等因素,配置相对较多数量的eventloop。例如,在处理高并发场景时,可能会配置几十甚至上百个eventloop来确保能够高效地处理众多客户端连接的各种业务操作。

registernamingservice方法

这时候可以看到我们nacos配置中配置了server.netty.portserver.netty.application.name这两个参数分别对应netty的端口和netty的微服务应用名。

registernamingservice方法用于往nacos中注册服务,这里通过namingservice类的registerinstance方法将netty服务注册进nacos中。

1.2. gateway网关转发

微服务中所有的请求都是由网关转发,这里使用gateway转发。

# spring配置
  spring:
    cloud:
      gateway:
        discovery:
          locator:
            lowercaseserviceid: true
            enabled: true
        routes:
          # 系统模块
          - id: soc-dmoasp-system
            uri: lb://soc-dmoasp-system
            predicates:
              - path=/system/**
            filters:
              - stripprefix=1
          # netty服务
          - id:  netty-server
            uri: lb:ws://soc-netty-server
            predicates:
              - path=/netty-server/**
            filters:
              - stripprefix=1
#不需要进行权限校验的uri
security:
  ignore:
    whites:
      - /auth/logout
      - /auth/login
      - /auth/register
      - /*/v2/api-docs
      - /csrf
      #netty连接地址
      - /netty-server/**

配置文件中添加netty路由,在鉴权网关中需要将socket地址放行,不进行权限验证。例如:

@component
@refreshscope
public class authfilter implements globalfilter, ordered
{
    private static final logger log = loggerfactory.getlogger(authfilter.class);

    // 排除过滤的 uri 地址,nacos自行添加
    @autowired
    private ignorewhiteproperties ignorewhite;

    @override
    public mono<void> filter(serverwebexchange exchange, gatewayfilterchain chain)
    {
        serverhttprequest request = exchange.getrequest();
        serverhttprequest.builder mutate = request.mutate();

        string url = request.geturi().getpath();
        // 跳过不需要验证的路径
        if (stringutils.matches(url, ignorewhite.getwhites()))
        {
            return chain.filter(exchange);
        }
        ......
    }
}

启动gateway和system模块

启动完成后system模块会打印nettyserver输出的启动日志,nacos中也会有手动注册的netty服务。

通过ws://127.0.0.1:8080/netty-server/socket就可以直接连接上netty服务器(8080为gateway的端口)。

2. 鉴权、心跳、客户端与服务端之间的通信

2.1. 鉴权

创建authhandler类,继承simplechannelinboundhandler类重写channelread0方法,channelread0中可以监听到客户端往服务端发送的消息。 例如:

import com.alibaba.fastjson2.json;
import com.alibaba.fastjson2.jsonobject;
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.core.constant.tokenconstants;
import com.soc.dmoasp.common.core.enums.nettymsgenum;
import com.soc.dmoasp.common.core.utils.jwtutils;
import com.soc.dmoasp.common.core.utils.stringutils;
import com.soc.dmoasp.common.redis.service.redisservice;
import com.soc.dmoasp.system.server.vo.nettyresult;
import io.jsonwebtoken.claims;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
import io.netty.util.attributekey;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.data.redis.core.redistemplate;

/**
 * netty鉴权处理
 * @author dsw
 * @date 2024/11/18 17:55
 */
public class authhandler extends simplechannelinboundhandler<textwebsocketframe> {

    logger log = loggerfactory.getlogger(authhandler.class);

    private final redistemplate<string, object> redistemplate;

    private final redisservice redisservice;

    public authhandler(redistemplate<string, object> stringobjectredistemplate, redisservice redisservice) {
        redistemplate = stringobjectredistemplate;
        this.redisservice = redisservice;
    }

    @override
    protected void channelread0(channelhandlercontext ctx, textwebsocketframe textwebsocketframe) throws exception {
        try {
            jsonobject clientmessage = json.parseobject(textwebsocketframe.text());
            //获取code 只判断code为鉴权的消息类型
            integer code = clientmessage.getinteger("code");
            if (nettymsgenum.auth_message.getcode().equals(code)) {
                //获取token
                string token = clientmessage.getstring("token");
                if (stringutils.isempty(token))
                {
                    ctx.channel().writeandflush(nettyresult.authfail("令牌不能为空"));
                    ctx.close();
                }
                // 如果前端设置了令牌前缀,则裁剪掉前缀
                if (stringutils.isnotempty(token) && token.startswith(tokenconstants.prefix))
                {
                    token = token.replacefirst(tokenconstants.prefix, stringutils.empty);
                }
                //jwt校验
                claims claims = jwtutils.parsetoken(token);
                if (claims == null)
                {
                    ctx.channel().writeandflush(nettyresult.authfail("令牌已过期或验证不正确"));
                    ctx.close();
                }
                string userkey = jwtutils.getuserkey(claims);
                //从redis中查看是否有这个token没有则不能登录
                boolean islogin = redisservice.haskey(gettokenkey(userkey));
                if (!islogin)
                {
                    ctx.channel().writeandflush(nettyresult.authfail("登录状态已过期"));
                    ctx.close();
                }
                //获取用户保存至socket连接会话中
                string userid = jwtutils.getuserid(claims);
                attributekey<string> useridkey = attributekey.valueof("userid");
                ctx.channel().attr(useridkey).setifabsent(userid);
                jsonobject jsonobject = new jsonobject();
                jsonobject.put("userid",userid);
                log.info("有新的socket客户端链接 userid :{}", userid);
                //将连接信息保存至redis中key为userid value为ctx.channel().id()
                redistemplate.opsforhash().put(cacheconstants.getuserchannelkey(),userid,ctx.channel().id());
                ctx.channel().writeandflush(nettyresult.success(nettymsgenum.auth_message.getcode(), "鉴权成功", jsonobject));
                //鉴权完成后移除authhandler消息监听
                ctx.pipeline().remove(authhandler.class);
            } else {
                ctx.channel().writeandflush(nettyresult.authfail("请先鉴权,在发送其他类型请求!"));
                ctx.close();
            }
        } catch (exception e) {
            log.error(e.getmessage());
            ctx.channel().writeandflush(nettyresult.authfail("鉴权失败"));
            ctx.close();
        }
    }

    /**
     * 获取缓存key
     */
    private string gettokenkey(string token)
    {
        return cacheconstants.login_token_key + token;
    }
}

泛型textwebsocketframe表示接收文本类型的消息。

其中连接的用户信息保存到redis中,redistemplate<string, object> redistemplate对象是用来保存netty连接信息的,序列化使用的是string(用户信息用string存储,使用json序列化会反序列化失败),对应接收的json串如下:

{"code":1001,token:"bearer xxxx"}

code取nettymsgenum中的code,token则是登录时生成的token令牌。

鉴权后将authhandler移除,会话后续的消息交互不在进authhandler。

nettymsgenum如下:

/**
 * netty消息类型枚举
 * @author dsw
 * @date 2024/11/18 17:58
 */
public enum nettymsgenum {

    auth_message(1001, "鉴权消息","auth-netty"),
    //{'code':1003,'data':{'unreadcount':0}}
    notice_message(1003, "公告通知消息","notice-netty"),
    heart_message(1006, "心跳消息","heart-netty"),
    error_message(-1, "错误",null);

    private final integer code;

    private final string info;

    private final string strategyname;

    nettymsgenum(integer code, string info, string strategyname){
        this.code = code;
        this.info = info;
        this.strategyname = strategyname;
    }

    public static nettymsgenum getbycode(integer code) {
        for (nettymsgenum msgenum : values()) {
            if (msgenum.getcode().equals(code)) {
                return msgenum;
            }
        }
        return error_message;
    }

    public integer getcode() {
        return code;
    }

    public string getinfo() {
        return info;
    }

    public string getstrategyname() {
        return strategyname;
    }
}

nettyresult如下:

import com.alibaba.fastjson2.json;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;

import java.io.serializable;

/**
 * netty响应实体
 * @author dsw
 * @date 2024/11/18 18:02
 */
public class nettyresult  implements serializable {

    private static final long serialversionuid = 1l;

    private integer code;

    private string message;

    private object data;

    public nettyresult(integer code, string message, object data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }


    public static textwebsocketframe fail(string message) {
        return new textwebsocketframe(json.tojsonstring(new nettyresult(-1, message, null)));
    }

    public static textwebsocketframe authfail(string message) {
        return new textwebsocketframe(json.tojsonstring(new nettyresult(-2, message, null)));
    }

    public static textwebsocketframe success( string message) {
        return new textwebsocketframe(json.tojsonstring(new nettyresult(200, message, null)));
    }

    public static textwebsocketframe success(integer code, object data) {
        return new textwebsocketframe(json.tojsonstring(new nettyresult(code,null, data)));
    }

    public static textwebsocketframe success(integer code, string message, object data) {
        return new textwebsocketframe(json.tojsonstring(new nettyresult(code,message, data)));
    }

    public integer getcode() {
        return code;
    }

    public string getmessage() {
        return message;
    }

    public object getdata() {
        return data;
    }
}

最后到nettyserver中的channelinitializer加入authhandler:

我们重新项目后连接socket查看结果

如果不鉴权直接发送消息,服务端会主动断开连接,客户端需要重连。

这就代表已经连接成功了。

2.2. 空闲检测

创建websocketidlestatehandler类继承idlestatehandler类,重写channelidle方法。

import io.netty.channel.channel;
import io.netty.channel.channelhandlercontext;
import io.netty.handler.timeout.idlestate;
import io.netty.handler.timeout.idlestateevent;
import io.netty.handler.timeout.idlestatehandler;
import org.slf4j.logger;
import org.slf4j.loggerfactory;

import java.util.concurrent.timeunit;

/**
 * 空闲检测
 * @author dsw
 * @date 2024/11/18 11:47
 */
public class websocketidlestatehandler extends idlestatehandler {

    logger log = loggerfactory.getlogger(websocketidlestatehandler.class);

    /**
     * 默认的读空闲时间
     */
    private static final int default_reader_idle_time = 10;

    /**
     * 默认10秒读空闲断开客户端
     */
    public websocketidlestatehandler() {
        super(default_reader_idle_time, 0, 0, timeunit.seconds);
    }

    /**
     * 指定心跳时间(秒)
     *
     * @param readeridletimeseconds 读空闲时间
     * @param writeridletimeseconds 写空闲时间
     * @param allidletimeseconds    读写空闲时间
     */
    public websocketidlestatehandler(int readeridletimeseconds, int writeridletimeseconds, int allidletimeseconds) {
        super(readeridletimeseconds, writeridletimeseconds, allidletimeseconds, timeunit.seconds);
    }

    /**
     * 指定心跳时间及时间单位
     *
     * @param readeridletime 读空闲时间
     * @param writeridletime 写空闲时间
     * @param allidletime    读写空闲时间
     * @param unit           时间单位
     */
    public websocketidlestatehandler(long readeridletime, long writeridletime, long allidletime, timeunit unit) {
        super(readeridletime, writeridletime, allidletime, unit);
    }

    /**
     * 当空闲事件触发时执行
     */
    @override
    protected void channelidle(channelhandlercontext ctx, idlestateevent evt) throws exception {
        //如果是读空闲
        if (evt.state().equals(idlestate.reader_idle)) {
            channel channel = ctx.channel();
            log.debug("服务端未检测到客户端【{}】的心跳包,强制关闭客户端!", channel.id());
            channel.close();
        }
        super.channelidle(ctx,evt);
    }
}

以上实现了父类的构造函数,可以指定具体的空闲时间。当空闲时会触发channelidle方法,则服务端主动断开连接。

最后到nettyserver中的channelinitializer加入websocketidlestatehandler:

加到最前面。

示例设置了10秒断开,需要使用中自行调整。

2.3. 消息通信

2.3.1. 接收客户端的消息

创建websockethandler类,继承simplechannelinboundhandler类重写channelread0、handleradded、handlerremoved、exceptioncaught方法。

  • channelread0方法:监听客户端发送过来的消息。
  • handleradded方法:websocket连接后会调用,将连接信息添加到通道组
  • handlerremoved方法:断开连接后会调用(服务端、客户端断开都会调用),用于用户下线(删除通道、删除redis中存储的连接信息)
  • exceptioncaught方法:发生异常后调用,发生异常后服务端通常会主动断开连接。
import com.alibaba.fastjson2.json;
import com.alibaba.fastjson2.jsonobject;
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.core.enums.nettymsgenum;
import com.soc.dmoasp.system.server.config.nettyconfig;
import com.soc.dmoasp.system.server.strategy.nettystrategy;
import com.soc.dmoasp.system.server.strategy.nettystrategyfactory;
import com.soc.dmoasp.system.server.vo.nettyresult;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
import io.netty.util.attributekey;
import org.apache.commons.lang3.stringutils;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.data.redis.core.redistemplate;


/**
 * websocket处理
 * @author dsw
 * @date 2024/11/18 10:20
 */
public class websockethandler extends simplechannelinboundhandler<textwebsocketframe> {

    logger log = loggerfactory.getlogger(websockethandler.class);

    private final redistemplate<string, object> redistemplate;
    
    private final nettystrategyfactory nettystrategyfactory;

    public websockethandler(redistemplate<string, object> redistemplate, nettystrategyfactory nettystrategyfactory) {
        this.redistemplate = redistemplate;
        this.nettystrategyfactory = nettystrategyfactory;
    }

    /**
    * websocket连接创建后调用
    */
    @override
    public void handleradded(channelhandlercontext ctx) throws exception {
        // 添加到channelgroup 通道组
        nettyconfig.getchannelgroup().add(ctx.channel());
    }
    /**
    * 读取数据
    */
    @override
    protected void channelread0(channelhandlercontext ctx, textwebsocketframe frame) throws exception {
        attributekey<string> useridkey = attributekey.valueof("userid");
        string userid = ctx.channel().attr(useridkey).get();
        log.info("收到消息 userid:{} message:{}",userid,frame.text());
        // 接收客户端的消息
        jsonobject pullmessage = json.parseobject(frame.text());
        integer code = pullmessage.getinteger("code");
        // 获取消息类型
        nettystrategy nettystrategy = nettystrategyfactory.getnettystrategy(nettymsgenum.getbycode(code));
        // 处理消息
        textwebsocketframe pushmessage = nettystrategy.execute(pullmessage);
        // 返回处理结果给客户端
        ctx.channel().writeandflush(pushmessage);
    }

    @override
    public void handlerremoved(channelhandlercontext ctx) throws exception {
        attributekey<string> useridkey = attributekey.valueof("userid");
        string userid = ctx.channel().attr(useridkey).get();
        log.info("用户下线了 userid:{}",userid);
        // 删除通道
        removeuserid(ctx);
    }

    @override
    public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
        ctx.channel().writeandflush(nettyresult.fail("系统错误:" + cause.getmessage()));
    }

    /**
    * 删除用户与channel的对应关系
    */
    private void removeuserid(channelhandlercontext ctx) {
        attributekey<string> useridkey = attributekey.valueof("userid");
        string userid = ctx.channel().attr(useridkey).get();
      if(stringutils.isnotblank(userid)){
         redistemplate.opsforhash().delete(cacheconstants.getuserchannelkey(),userid);
      }
   }
}

这里收到消息后通过一个策略模式进入不同的策略,通过nettymsgenum里面定义的code指定不同的策略类。

strategy和strategyfactory:

import com.alibaba.fastjson2.jsonobject;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;

/**
 * netty接收消息处理策略类
 * @author dsw
 * @date 2024/5/27 10:21
 */
public interface nettystrategy {

    /**
     * 执行添加数值
     *
     * @return
     */
    textwebsocketframe execute(jsonobject message);
}
import com.soc.dmoasp.common.core.enums.nettymsgenum;
import org.slf4j.logger;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;

import java.util.map;

/**
 * netty策略工厂
 * @author dsw
 * @date 2024/11/18 10:20
 */
@component
public class nettystrategyfactory {

    logger log = org.slf4j.loggerfactory.getlogger(nettystrategyfactory.class);

    /**
     * 通过spring容器的方式注入
     */
    @autowired
    private map<string, nettystrategy> nettystrategy;

    /**
     * 获取对应策略类
     * @param
     */
    public nettystrategy getnettystrategy(nettymsgenum nettymsgenum){
        if(!nettystrategy.containskey(nettymsgenum.getstrategyname())){
            log.warn("没有对应的消息策略");
            throw new runtimeexception("没有对应的消息策略");
        }
        return nettystrategy.get(nettymsgenum.getstrategyname());
    }
}

我们实现一个心跳消息的策略:

import com.alibaba.fastjson2.jsonobject;
import com.soc.dmoasp.common.core.enums.nettymsgenum;
import com.soc.dmoasp.system.server.strategy.nettystrategy;
import com.soc.dmoasp.system.server.vo.nettyresult;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
import org.springframework.stereotype.component;

/**
 * netty心跳消息
 * @author dsw
 * @date 2024/5/27 10:25
 */
@component("heart-netty")
public class heartstrategyimpl implements nettystrategy {

    @override
    public textwebsocketframe execute(jsonobject message) {
        string data = message.getstring("data");
        if ("ping".equals(data)) {
            return nettyresult.success(nettymsgenum.heart_message.getcode(), null, "pong");
        }
        return nettyresult.fail("消息格式不正确");
    }
}

添加至channelinitializer后重启查看效果。

2.3.2. 发送消息给客户端

集群下面的netty配置

方案1:使用redis的发布订阅

方案2:使用mq的发布订阅

我们这里使用使用redis的发布订阅实现。

添加redis订阅器:

import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.redis.configure.fastjson2jsonredisserializer;
import com.soc.dmoasp.system.server.receiver.pushmsgredisreceiver;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.data.redis.connection.redisconnectionfactory;
import org.springframework.data.redis.listener.patterntopic;
import org.springframework.data.redis.listener.redismessagelistenercontainer;
import org.springframework.data.redis.listener.adapter.messagelisteneradapter;

import java.util.collections;
import java.util.list;

/**
 * redisreceiver 配置
 * @author dsw
 * @date 2024/11/18 12:07
 */
@configuration
public class redisreceiverconfig {

    @bean
    public redismessagelistenercontainer redismessagelistenercontainer(redisconnectionfactory redisconnectionfactory,
                                                                       messagelisteneradapter listeneradapter) {
        redismessagelistenercontainer container = new redismessagelistenercontainer();
        container.setconnectionfactory(redisconnectionfactory);
        list<patterntopic> topics = collections.singletonlist(
            patterntopic.of(cacheconstants.topic.sys_socket_push_topic)
        );
        // 添加订阅者监听类,数量不限.patterntopic定义监听主题,这里监听test-topic主题
        container.addmessagelistener(listeneradapter, topics);
        return container;
    }

    @bean
    @suppresswarnings(value = { "unchecked", "rawtypes" })
    public messagelisteneradapter listeneradapter(pushmsgredisreceiver pushmsgredisreceiver) {
        messagelisteneradapter adapter = new messagelisteneradapter(pushmsgredisreceiver);
        fastjson2jsonredisserializer serializer = new fastjson2jsonredisserializer(object.class);
        adapter.setserializer(serializer);
        return adapter;
    }
}
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.core.exception.asserts;
import com.soc.dmoasp.common.redis.dto.nettymessage;
import com.soc.dmoasp.system.server.config.nettyconfig;
import com.soc.dmoasp.system.server.vo.nettyresult;
import io.netty.channel.channel;
import io.netty.channel.channelid;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.stereotype.component;

import java.util.objects;

/**
 * netty发送消息接收者
 * @author dsw
 * @date 2024/11/18 12:07
 */
@component
public class pushmsgredisreceiver {
    logger log = loggerfactory.getlogger(pushmsgredisreceiver.class);

    @autowired
    private redistemplate<string,object> stringobjectredistemplate;

    /**
     * redistopic订阅
     * @param nettymessage netty消息
     * @param topic netty消息对应的topic
     */
    public void handlemessage(nettymessage nettymessage, string topic) {

        object channelid = stringobjectredistemplate.opsforhash().get(cacheconstants.getuserchannelkey(), nettymessage.getuserid());
        if (objects.isnull(channelid)) {
            log.warn("推送消息失败,用户不在线! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage());
            asserts.fail("推送消息失败,用户不在线!");
        }
        channel channel = nettyconfig.getchannelgroup().find((channelid) channelid);
        if(channel!=null){
            channel.writeandflush(nettyresult.success(nettymessage.getcode(),nettymessage.getmessage()));
            log.info("推送消息成功! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage());
        }else {
            log.warn("推送消息失败,没有找到channel! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage());
        }
    }
}

发布订阅的机制就是所有集群都会收到消息,收到消息后每个netty集群都去找对应的消息会话通道,如果没找到则说明连接不到当前服务上,找到通道后则可以直接推送。 这里使用stringobjectredistemplate获取用户通道,避免序列化失败。

redisservice中实现发布消息

/**
 * redis消息发布订阅 发布消息
 * @param channel 通道id
 * @param message 消息
 */
@async
public void convertandsend(string channel, object message) {
    redistemplate.convertandsend(channel, message);
}

发布消息的工具类:

import com.alibaba.fastjson2.jsonobject;
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.redis.dto.nettymessage;
import com.soc.dmoasp.common.redis.dto.pushsocketmsgdto;
import org.slf4j.logger;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.stereotype.service;

import java.util.set;

/**
 * netty发送消息给服务端
 * @author dsw
 * @date 2024/11/18 11:47
 */
@service
public class pushsocketmsgservice {

    logger log = org.slf4j.loggerfactory.getlogger(pushsocketmsgservice.class);

    @autowired
    private redistemplate<string, object> stringobjectredistemplate;

    @autowired
    private redisservice redisservice;

    /**
     * 给所有用户发送消息
     * @param msgdto
     */
    public void pushmsgtoall( pushsocketmsgdto msgdto) {
        set<object> keys = stringobjectredistemplate.opsforhash().keys(cacheconstants.getuserchannelkey());
        keys.foreach(key -> this.pushmsgtouser(msgdto.getcode(), key.tostring(), msgdto.getmessage()));
    }

    /**
     * 给指定用户发送消息
     * @param msgdto
     */
    public void pushmsgtouserlist(pushsocketmsgdto msgdto) {
        for(long userid : msgdto.getuseridlist()){
            this.pushmsgtouser(msgdto.getcode(), userid.tostring(), msgdto.getmessage());
        }
    }

    protected void pushmsgtouser(integer code, string userid, jsonobject message) {
        //推送到其他负载处理
        nettymessage nettymessage = new nettymessage();
        nettymessage.setuserid(userid);
        nettymessage.setcode(code);
        nettymessage.setmessage(message);
        redisservice.convertandsend(cacheconstants.topic.sys_socket_push_topic, nettymessage);
        log.info("推送消息成功! userid:{},message:{}", userid, message);
    }
}

nettymessage和pushsocketmsgdto:

import com.alibaba.fastjson2.jsonobject;
import lombok.data;

/**
 * neety消息发布vo
 * @author dsw
 * @date 2024/11/18 13:49
 */
@data
public class nettymessage {

    private integer code;

    private string userid;

    private jsonobject message;

}
import com.alibaba.fastjson2.jsonobject;
import lombok.data;

/**
 * 发送socket消息dto
 * @author dsw
 * @date 2024/11/18 13:39
 */
@data
public class pushsocketmsgdto {
    /**
     * 消息类型
     * 详情看 nettymsgenum
     */
    private integer code;
    /**
     * 用户id
     */
    private list<long> useridlist;
    /**
     * 消息体
     */
    private jsonobject message;
}

测试结果:

以上就是springcloud整合netty集群实现websocket的示例代码的详细内容,更多关于springcloud netty实现websocket的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com