引言
在分布式系统中,spring cloud 为微服务架构提供了丰富的功能,而 netty 是一个高性能的网络通信框架。将二者结合实现 socket 集群,可以满足大规模、高性能网络通信的需求,实现前后端间高效稳定的通信。
1. 服务注册和发现中心
这里服务注册和发行中心使用nacos为例(需要启动一个nacos服务器)。
微服务注册: 在每一个微服务项目中,添加nacos客户端连接,并在配置文件中指定服务名称和端口。例如:
# tomcat server: port: 9201 netty: port: 10201 application: name: yhy-netty-server # spring spring: application: # 应用名称 name: soc-dmoasp-system profiles: # 环境配置 active: dev cloud: nacos: discovery: # 服务注册地址 server-addr: nacos-registry:8858 config: # 配置中心地址 server-addr: nacos-registry:8858 file-extension: yml # 共享配置 shared-configs: - data-id: application.${spring.cloud.nacos.config.file-extension} refresh: true - data-id: soc-dmoasp-redission.${spring.cloud.nacos.config.file-extension} - data-id: soc-dmoasp-druid.${spring.cloud.nacos.config.file-extension}
这是一个基本的服务配置。里面关于netty的applicaiton.name和port可以通过nacos的namingservice类手动注册。
1.1. netty服务器搭建
- 添加netty依赖:在具体微服务中的
pom.xm
l中添加netty依赖:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-reactor-netty</artifactid> </dependency>
- netty启动类:创建一个nettyserver类,用于启动netty,示例如下:
import com.alibaba.cloud.nacos.nacosdiscoveryproperties; import com.alibaba.nacos.api.propertykeyconst; import com.alibaba.nacos.api.naming.namingfactory; import com.alibaba.nacos.api.naming.namingservice; import com.alibaba.nacos.api.naming.pojo.instance; import com.soc.dmoasp.system.server.handler.websocketidlestatehandler; import io.netty.bootstrap.serverbootstrap; import io.netty.channel.channelfuture; import io.netty.channel.channelinitializer; import io.netty.channel.channelpipeline; import io.netty.channel.eventloopgroup; import io.netty.channel.nio.nioeventloopgroup; import io.netty.channel.socket.socketchannel; import io.netty.channel.socket.nio.nioserversocketchannel; import io.netty.handler.codec.http.httpobjectaggregator; import io.netty.handler.codec.http.httpservercodec; import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler; import io.netty.handler.stream.chunkedwritehandler; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.value; import org.springframework.boot.commandlinerunner; import org.springframework.scheduling.annotation.async; import org.springframework.stereotype.component; import javax.annotation.predestroy; import java.net.inetaddress; import java.util.properties; /** * netty服务 * @author dsw * @date 2024/11/18 17:34 */ @component public class nettyserver implements commandlinerunner { logger log = loggerfactory.getlogger(nettyserver.class); @autowired private nacosdiscoveryproperties nacosdiscoveryproperties; @value("${server.netty.port}") private integer nettyport; @value("${server.netty.application.name}") private string nettyname; eventloopgroup bossgroup; eventloopgroup workgroup; @override public void run(string... args) throws exception { log.info("初始化netty配置开始"); //netty 服务端启动的端口不可和springboot启动类的端口号重复 this.start(); //关闭服务器的时候同时关闭netty服务 runtime.getruntime().addshutdownhook(new thread(() -> { try { this.destroy(); } catch (interruptedexception e) { log.error(e.getmessage()); } })); } @async public void start() throws interruptedexception { try { bossgroup = new nioeventloopgroup(1); workgroup = new nioeventloopgroup(10); serverbootstrap bootstrap = new serverbootstrap(); // bossgroup辅助客户端的tcp连接请求, workgroup负责与客户端之前的读写操作 bootstrap.group(bossgroup, workgroup) // 指定channel .channel(nioserversocketchannel.class) .childhandler(new channelinitializer<socketchannel>() { @override protected void initchannel(socketchannel socketchannel) throws exception { channelpipeline pipeline = socketchannel.pipeline(); pipeline //http解码 .addlast(new httpservercodec()) .addlast(new chunkedwritehandler()) //http段聚合 .addlast(new httpobjectaggregator(1024*1024)) //将http协议转成ws协议 .addlast(new websocketserverprotocolhandler("/socket")) ; } }); registernamingservice(nettyname,nettyport); // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功 channelfuture future = bootstrap.bind(nettyport).sync(); if (future.issuccess()) { log.info("server started and listen on:{}", future.channel().localaddress()); log.info("启动 netty server"); } } catch (interruptedexception e) { log.error("netty异常:{}", e.getmessage()); } } /** * 将netty服务注册进nacos * * @param nettyname 服务名称 * @param nettyport 服务端口号 */ private void registernamingservice(string nettyname, integer nettyport) { try { properties properties = new properties(); properties.setproperty(propertykeyconst.server_addr, nacosdiscoveryproperties.getserveraddr()); properties.setproperty(propertykeyconst.namespace, nacosdiscoveryproperties.getnamespace()); properties.setproperty(propertykeyconst.username, nacosdiscoveryproperties.getusername()); properties.setproperty(propertykeyconst.password, nacosdiscoveryproperties.getpassword()); namingservice namingservice = namingfactory.createnamingservice(properties); inetaddress address = inetaddress.getlocalhost(); // 定义服务实例信息 instance instance = new instance(); instance.setip(address.gethostaddress()); instance.setport(nettyport); instance.setweight(1.0); instance.sethealthy(true); namingservice.registerinstance(nettyname, nacosdiscoveryproperties.getgroup(), instance); } catch (exception e) { throw new runtimeexception(e); } } /** * 释放资源 */ @predestroy public void destroy() throws interruptedexception { if (bossgroup != null) { bossgroup.shutdowngracefully().sync(); } if (workgroup != null) { workgroup.shutdowngracefully().sync(); } log.info("关闭netty"); } }
使用commandlinerunner接口实现run方法在启动项目的时候把netty服务带起。
bossgroup
和 workgroup
的角色区别
bossgroup(老板组)
主要职责是负责监听服务器端的端口,等待新的客户端连接请求到来。它就像是公司里负责接待新客户的前台人员,当有新客户(客户端)想要连接到服务器时,
bossgroup
中的eventloop
会接收到这个连接请求。一般情况下,
bossgroup
只需要配置较少数量的eventloop
就可以满足需求,因为它主要处理的是连接建立的初期阶段,即接受新连接这个相对不那么频繁的操作(相比于后续处理大量数据传输等操作)。通常会设置为 1 个eventloop
或者根据服务器的具体性能和预期的连接请求频率适当增加数量,但总体数量相对较少。workgroup(工作组)
一旦
bossgroup
接受了新的客户端连接,就会把这个新连接交给workgroup
来进一步处理后续的所有与该连接相关的操作,比如读取客户端发送的数据、向客户端发送响应数据等。它就像是公司里负责具体为客户办理业务的工作人员。workgroup
需要处理大量的实际业务数据传输和交互工作,所以通常会根据服务器的性能和预期要处理的并发连接数量等因素,配置相对较多数量的eventloop
。例如,在处理高并发场景时,可能会配置几十甚至上百个eventloop
来确保能够高效地处理众多客户端连接的各种业务操作。
registernamingservice方法
这时候可以看到我们nacos配置中配置了server.netty.port
和server.netty.application.name
这两个参数分别对应netty的端口和netty的微服务应用名。
registernamingservice方法用于往nacos中注册服务,这里通过namingservice
类的registerinstance方法
将netty服务注册进nacos中。
1.2. gateway网关转发
微服务中所有的请求都是由网关转发,这里使用gateway转发。
# spring配置 spring: cloud: gateway: discovery: locator: lowercaseserviceid: true enabled: true routes: # 系统模块 - id: soc-dmoasp-system uri: lb://soc-dmoasp-system predicates: - path=/system/** filters: - stripprefix=1 # netty服务 - id: netty-server uri: lb:ws://soc-netty-server predicates: - path=/netty-server/** filters: - stripprefix=1 #不需要进行权限校验的uri security: ignore: whites: - /auth/logout - /auth/login - /auth/register - /*/v2/api-docs - /csrf #netty连接地址 - /netty-server/**
配置文件中添加netty路由,在鉴权网关中需要将socket地址放行,不进行权限验证。例如:
@component @refreshscope public class authfilter implements globalfilter, ordered { private static final logger log = loggerfactory.getlogger(authfilter.class); // 排除过滤的 uri 地址,nacos自行添加 @autowired private ignorewhiteproperties ignorewhite; @override public mono<void> filter(serverwebexchange exchange, gatewayfilterchain chain) { serverhttprequest request = exchange.getrequest(); serverhttprequest.builder mutate = request.mutate(); string url = request.geturi().getpath(); // 跳过不需要验证的路径 if (stringutils.matches(url, ignorewhite.getwhites())) { return chain.filter(exchange); } ...... } }
启动gateway和system模块
启动完成后system模块会打印nettyserver输出的启动日志,nacos中也会有手动注册的netty服务。
通过ws://127.0.0.1:8080/netty-server/socket就可以直接连接上netty服务器(8080为gateway的端口)。
2. 鉴权、心跳、客户端与服务端之间的通信
2.1. 鉴权
创建authhandler类,继承simplechannelinboundhandler
类重写channelread0
方法,channelread0
中可以监听到客户端往服务端发送的消息。 例如:
import com.alibaba.fastjson2.json; import com.alibaba.fastjson2.jsonobject; import com.soc.dmoasp.common.core.constant.cacheconstants; import com.soc.dmoasp.common.core.constant.tokenconstants; import com.soc.dmoasp.common.core.enums.nettymsgenum; import com.soc.dmoasp.common.core.utils.jwtutils; import com.soc.dmoasp.common.core.utils.stringutils; import com.soc.dmoasp.common.redis.service.redisservice; import com.soc.dmoasp.system.server.vo.nettyresult; import io.jsonwebtoken.claims; import io.netty.channel.channelhandlercontext; import io.netty.channel.simplechannelinboundhandler; import io.netty.handler.codec.http.websocketx.textwebsocketframe; import io.netty.util.attributekey; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.data.redis.core.redistemplate; /** * netty鉴权处理 * @author dsw * @date 2024/11/18 17:55 */ public class authhandler extends simplechannelinboundhandler<textwebsocketframe> { logger log = loggerfactory.getlogger(authhandler.class); private final redistemplate<string, object> redistemplate; private final redisservice redisservice; public authhandler(redistemplate<string, object> stringobjectredistemplate, redisservice redisservice) { redistemplate = stringobjectredistemplate; this.redisservice = redisservice; } @override protected void channelread0(channelhandlercontext ctx, textwebsocketframe textwebsocketframe) throws exception { try { jsonobject clientmessage = json.parseobject(textwebsocketframe.text()); //获取code 只判断code为鉴权的消息类型 integer code = clientmessage.getinteger("code"); if (nettymsgenum.auth_message.getcode().equals(code)) { //获取token string token = clientmessage.getstring("token"); if (stringutils.isempty(token)) { ctx.channel().writeandflush(nettyresult.authfail("令牌不能为空")); ctx.close(); } // 如果前端设置了令牌前缀,则裁剪掉前缀 if (stringutils.isnotempty(token) && token.startswith(tokenconstants.prefix)) { token = token.replacefirst(tokenconstants.prefix, stringutils.empty); } //jwt校验 claims claims = jwtutils.parsetoken(token); if (claims == null) { ctx.channel().writeandflush(nettyresult.authfail("令牌已过期或验证不正确")); ctx.close(); } string userkey = jwtutils.getuserkey(claims); //从redis中查看是否有这个token没有则不能登录 boolean islogin = redisservice.haskey(gettokenkey(userkey)); if (!islogin) { ctx.channel().writeandflush(nettyresult.authfail("登录状态已过期")); ctx.close(); } //获取用户保存至socket连接会话中 string userid = jwtutils.getuserid(claims); attributekey<string> useridkey = attributekey.valueof("userid"); ctx.channel().attr(useridkey).setifabsent(userid); jsonobject jsonobject = new jsonobject(); jsonobject.put("userid",userid); log.info("有新的socket客户端链接 userid :{}", userid); //将连接信息保存至redis中key为userid value为ctx.channel().id() redistemplate.opsforhash().put(cacheconstants.getuserchannelkey(),userid,ctx.channel().id()); ctx.channel().writeandflush(nettyresult.success(nettymsgenum.auth_message.getcode(), "鉴权成功", jsonobject)); //鉴权完成后移除authhandler消息监听 ctx.pipeline().remove(authhandler.class); } else { ctx.channel().writeandflush(nettyresult.authfail("请先鉴权,在发送其他类型请求!")); ctx.close(); } } catch (exception e) { log.error(e.getmessage()); ctx.channel().writeandflush(nettyresult.authfail("鉴权失败")); ctx.close(); } } /** * 获取缓存key */ private string gettokenkey(string token) { return cacheconstants.login_token_key + token; } }
泛型textwebsocketframe
表示接收文本类型的消息。
其中连接的用户信息保存到redis中,redistemplate<string, object> redistemplate
对象是用来保存netty连接信息的,序列化使用的是string(用户信息用string存储,使用json序列化会反序列化失败),对应接收的json串如下:
{"code":1001,token:"bearer xxxx"}
code取nettymsgenum中的code,token则是登录时生成的token令牌。
鉴权后将authhandler移除,会话后续的消息交互不在进authhandler。
nettymsgenum如下:
/** * netty消息类型枚举 * @author dsw * @date 2024/11/18 17:58 */ public enum nettymsgenum { auth_message(1001, "鉴权消息","auth-netty"), //{'code':1003,'data':{'unreadcount':0}} notice_message(1003, "公告通知消息","notice-netty"), heart_message(1006, "心跳消息","heart-netty"), error_message(-1, "错误",null); private final integer code; private final string info; private final string strategyname; nettymsgenum(integer code, string info, string strategyname){ this.code = code; this.info = info; this.strategyname = strategyname; } public static nettymsgenum getbycode(integer code) { for (nettymsgenum msgenum : values()) { if (msgenum.getcode().equals(code)) { return msgenum; } } return error_message; } public integer getcode() { return code; } public string getinfo() { return info; } public string getstrategyname() { return strategyname; } }
nettyresult如下:
import com.alibaba.fastjson2.json; import io.netty.handler.codec.http.websocketx.textwebsocketframe; import java.io.serializable; /** * netty响应实体 * @author dsw * @date 2024/11/18 18:02 */ public class nettyresult implements serializable { private static final long serialversionuid = 1l; private integer code; private string message; private object data; public nettyresult(integer code, string message, object data) { this.code = code; this.message = message; this.data = data; } public static textwebsocketframe fail(string message) { return new textwebsocketframe(json.tojsonstring(new nettyresult(-1, message, null))); } public static textwebsocketframe authfail(string message) { return new textwebsocketframe(json.tojsonstring(new nettyresult(-2, message, null))); } public static textwebsocketframe success( string message) { return new textwebsocketframe(json.tojsonstring(new nettyresult(200, message, null))); } public static textwebsocketframe success(integer code, object data) { return new textwebsocketframe(json.tojsonstring(new nettyresult(code,null, data))); } public static textwebsocketframe success(integer code, string message, object data) { return new textwebsocketframe(json.tojsonstring(new nettyresult(code,message, data))); } public integer getcode() { return code; } public string getmessage() { return message; } public object getdata() { return data; } }
最后到nettyserver中的channelinitializer加入authhandler:
我们重新项目后连接socket查看结果
如果不鉴权直接发送消息,服务端会主动断开连接,客户端需要重连。
这就代表已经连接成功了。
2.2. 空闲检测
创建websocketidlestatehandler
类继承idlestatehandler
类,重写channelidle
方法。
import io.netty.channel.channel; import io.netty.channel.channelhandlercontext; import io.netty.handler.timeout.idlestate; import io.netty.handler.timeout.idlestateevent; import io.netty.handler.timeout.idlestatehandler; import org.slf4j.logger; import org.slf4j.loggerfactory; import java.util.concurrent.timeunit; /** * 空闲检测 * @author dsw * @date 2024/11/18 11:47 */ public class websocketidlestatehandler extends idlestatehandler { logger log = loggerfactory.getlogger(websocketidlestatehandler.class); /** * 默认的读空闲时间 */ private static final int default_reader_idle_time = 10; /** * 默认10秒读空闲断开客户端 */ public websocketidlestatehandler() { super(default_reader_idle_time, 0, 0, timeunit.seconds); } /** * 指定心跳时间(秒) * * @param readeridletimeseconds 读空闲时间 * @param writeridletimeseconds 写空闲时间 * @param allidletimeseconds 读写空闲时间 */ public websocketidlestatehandler(int readeridletimeseconds, int writeridletimeseconds, int allidletimeseconds) { super(readeridletimeseconds, writeridletimeseconds, allidletimeseconds, timeunit.seconds); } /** * 指定心跳时间及时间单位 * * @param readeridletime 读空闲时间 * @param writeridletime 写空闲时间 * @param allidletime 读写空闲时间 * @param unit 时间单位 */ public websocketidlestatehandler(long readeridletime, long writeridletime, long allidletime, timeunit unit) { super(readeridletime, writeridletime, allidletime, unit); } /** * 当空闲事件触发时执行 */ @override protected void channelidle(channelhandlercontext ctx, idlestateevent evt) throws exception { //如果是读空闲 if (evt.state().equals(idlestate.reader_idle)) { channel channel = ctx.channel(); log.debug("服务端未检测到客户端【{}】的心跳包,强制关闭客户端!", channel.id()); channel.close(); } super.channelidle(ctx,evt); } }
以上实现了父类的构造函数,可以指定具体的空闲时间。当空闲时会触发channelidle
方法,则服务端主动断开连接。
最后到nettyserver中的channelinitializer加入websocketidlestatehandler:
加到最前面。
示例设置了10秒断开,需要使用中自行调整。
2.3. 消息通信
2.3.1. 接收客户端的消息
创建websockethandler
类,继承simplechannelinboundhandler
类重写channelread0、handleradded、handlerremoved、exceptioncaught
方法。
channelread0
方法:监听客户端发送过来的消息。handleradded
方法:websocket连接后会调用,将连接信息添加到通道组handlerremoved
方法:断开连接后会调用(服务端、客户端断开都会调用),用于用户下线(删除通道、删除redis中存储的连接信息)exceptioncaught
方法:发生异常后调用,发生异常后服务端通常会主动断开连接。
import com.alibaba.fastjson2.json; import com.alibaba.fastjson2.jsonobject; import com.soc.dmoasp.common.core.constant.cacheconstants; import com.soc.dmoasp.common.core.enums.nettymsgenum; import com.soc.dmoasp.system.server.config.nettyconfig; import com.soc.dmoasp.system.server.strategy.nettystrategy; import com.soc.dmoasp.system.server.strategy.nettystrategyfactory; import com.soc.dmoasp.system.server.vo.nettyresult; import io.netty.channel.channelhandlercontext; import io.netty.channel.simplechannelinboundhandler; import io.netty.handler.codec.http.websocketx.textwebsocketframe; import io.netty.util.attributekey; import org.apache.commons.lang3.stringutils; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.data.redis.core.redistemplate; /** * websocket处理 * @author dsw * @date 2024/11/18 10:20 */ public class websockethandler extends simplechannelinboundhandler<textwebsocketframe> { logger log = loggerfactory.getlogger(websockethandler.class); private final redistemplate<string, object> redistemplate; private final nettystrategyfactory nettystrategyfactory; public websockethandler(redistemplate<string, object> redistemplate, nettystrategyfactory nettystrategyfactory) { this.redistemplate = redistemplate; this.nettystrategyfactory = nettystrategyfactory; } /** * websocket连接创建后调用 */ @override public void handleradded(channelhandlercontext ctx) throws exception { // 添加到channelgroup 通道组 nettyconfig.getchannelgroup().add(ctx.channel()); } /** * 读取数据 */ @override protected void channelread0(channelhandlercontext ctx, textwebsocketframe frame) throws exception { attributekey<string> useridkey = attributekey.valueof("userid"); string userid = ctx.channel().attr(useridkey).get(); log.info("收到消息 userid:{} message:{}",userid,frame.text()); // 接收客户端的消息 jsonobject pullmessage = json.parseobject(frame.text()); integer code = pullmessage.getinteger("code"); // 获取消息类型 nettystrategy nettystrategy = nettystrategyfactory.getnettystrategy(nettymsgenum.getbycode(code)); // 处理消息 textwebsocketframe pushmessage = nettystrategy.execute(pullmessage); // 返回处理结果给客户端 ctx.channel().writeandflush(pushmessage); } @override public void handlerremoved(channelhandlercontext ctx) throws exception { attributekey<string> useridkey = attributekey.valueof("userid"); string userid = ctx.channel().attr(useridkey).get(); log.info("用户下线了 userid:{}",userid); // 删除通道 removeuserid(ctx); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { ctx.channel().writeandflush(nettyresult.fail("系统错误:" + cause.getmessage())); } /** * 删除用户与channel的对应关系 */ private void removeuserid(channelhandlercontext ctx) { attributekey<string> useridkey = attributekey.valueof("userid"); string userid = ctx.channel().attr(useridkey).get(); if(stringutils.isnotblank(userid)){ redistemplate.opsforhash().delete(cacheconstants.getuserchannelkey(),userid); } } }
这里收到消息后通过一个策略模式进入不同的策略,通过nettymsgenum
里面定义的code指定不同的策略类。
strategy和strategyfactory:
import com.alibaba.fastjson2.jsonobject; import io.netty.handler.codec.http.websocketx.textwebsocketframe; /** * netty接收消息处理策略类 * @author dsw * @date 2024/5/27 10:21 */ public interface nettystrategy { /** * 执行添加数值 * * @return */ textwebsocketframe execute(jsonobject message); }
import com.soc.dmoasp.common.core.enums.nettymsgenum; import org.slf4j.logger; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import java.util.map; /** * netty策略工厂 * @author dsw * @date 2024/11/18 10:20 */ @component public class nettystrategyfactory { logger log = org.slf4j.loggerfactory.getlogger(nettystrategyfactory.class); /** * 通过spring容器的方式注入 */ @autowired private map<string, nettystrategy> nettystrategy; /** * 获取对应策略类 * @param */ public nettystrategy getnettystrategy(nettymsgenum nettymsgenum){ if(!nettystrategy.containskey(nettymsgenum.getstrategyname())){ log.warn("没有对应的消息策略"); throw new runtimeexception("没有对应的消息策略"); } return nettystrategy.get(nettymsgenum.getstrategyname()); } }
我们实现一个心跳消息的策略:
import com.alibaba.fastjson2.jsonobject; import com.soc.dmoasp.common.core.enums.nettymsgenum; import com.soc.dmoasp.system.server.strategy.nettystrategy; import com.soc.dmoasp.system.server.vo.nettyresult; import io.netty.handler.codec.http.websocketx.textwebsocketframe; import org.springframework.stereotype.component; /** * netty心跳消息 * @author dsw * @date 2024/5/27 10:25 */ @component("heart-netty") public class heartstrategyimpl implements nettystrategy { @override public textwebsocketframe execute(jsonobject message) { string data = message.getstring("data"); if ("ping".equals(data)) { return nettyresult.success(nettymsgenum.heart_message.getcode(), null, "pong"); } return nettyresult.fail("消息格式不正确"); } }
添加至channelinitializer
后重启查看效果。
2.3.2. 发送消息给客户端
集群下面的netty配置
方案1:使用redis的发布订阅
方案2:使用mq的发布订阅
我们这里使用使用redis的发布订阅实现。
添加redis订阅器:
import com.soc.dmoasp.common.core.constant.cacheconstants; import com.soc.dmoasp.common.redis.configure.fastjson2jsonredisserializer; import com.soc.dmoasp.system.server.receiver.pushmsgredisreceiver; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.data.redis.connection.redisconnectionfactory; import org.springframework.data.redis.listener.patterntopic; import org.springframework.data.redis.listener.redismessagelistenercontainer; import org.springframework.data.redis.listener.adapter.messagelisteneradapter; import java.util.collections; import java.util.list; /** * redisreceiver 配置 * @author dsw * @date 2024/11/18 12:07 */ @configuration public class redisreceiverconfig { @bean public redismessagelistenercontainer redismessagelistenercontainer(redisconnectionfactory redisconnectionfactory, messagelisteneradapter listeneradapter) { redismessagelistenercontainer container = new redismessagelistenercontainer(); container.setconnectionfactory(redisconnectionfactory); list<patterntopic> topics = collections.singletonlist( patterntopic.of(cacheconstants.topic.sys_socket_push_topic) ); // 添加订阅者监听类,数量不限.patterntopic定义监听主题,这里监听test-topic主题 container.addmessagelistener(listeneradapter, topics); return container; } @bean @suppresswarnings(value = { "unchecked", "rawtypes" }) public messagelisteneradapter listeneradapter(pushmsgredisreceiver pushmsgredisreceiver) { messagelisteneradapter adapter = new messagelisteneradapter(pushmsgredisreceiver); fastjson2jsonredisserializer serializer = new fastjson2jsonredisserializer(object.class); adapter.setserializer(serializer); return adapter; } }
import com.soc.dmoasp.common.core.constant.cacheconstants; import com.soc.dmoasp.common.core.exception.asserts; import com.soc.dmoasp.common.redis.dto.nettymessage; import com.soc.dmoasp.system.server.config.nettyconfig; import com.soc.dmoasp.system.server.vo.nettyresult; import io.netty.channel.channel; import io.netty.channel.channelid; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.redistemplate; import org.springframework.stereotype.component; import java.util.objects; /** * netty发送消息接收者 * @author dsw * @date 2024/11/18 12:07 */ @component public class pushmsgredisreceiver { logger log = loggerfactory.getlogger(pushmsgredisreceiver.class); @autowired private redistemplate<string,object> stringobjectredistemplate; /** * redistopic订阅 * @param nettymessage netty消息 * @param topic netty消息对应的topic */ public void handlemessage(nettymessage nettymessage, string topic) { object channelid = stringobjectredistemplate.opsforhash().get(cacheconstants.getuserchannelkey(), nettymessage.getuserid()); if (objects.isnull(channelid)) { log.warn("推送消息失败,用户不在线! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage()); asserts.fail("推送消息失败,用户不在线!"); } channel channel = nettyconfig.getchannelgroup().find((channelid) channelid); if(channel!=null){ channel.writeandflush(nettyresult.success(nettymessage.getcode(),nettymessage.getmessage())); log.info("推送消息成功! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage()); }else { log.warn("推送消息失败,没有找到channel! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage()); } } }
发布订阅的机制就是所有集群都会收到消息,收到消息后每个netty集群都去找对应的消息会话通道,如果没找到则说明连接不到当前服务上,找到通道后则可以直接推送。 这里使用stringobjectredistemplate
获取用户通道,避免序列化失败。
redisservice中实现发布消息
/** * redis消息发布订阅 发布消息 * @param channel 通道id * @param message 消息 */ @async public void convertandsend(string channel, object message) { redistemplate.convertandsend(channel, message); }
发布消息的工具类:
import com.alibaba.fastjson2.jsonobject; import com.soc.dmoasp.common.core.constant.cacheconstants; import com.soc.dmoasp.common.redis.dto.nettymessage; import com.soc.dmoasp.common.redis.dto.pushsocketmsgdto; import org.slf4j.logger; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.redistemplate; import org.springframework.stereotype.service; import java.util.set; /** * netty发送消息给服务端 * @author dsw * @date 2024/11/18 11:47 */ @service public class pushsocketmsgservice { logger log = org.slf4j.loggerfactory.getlogger(pushsocketmsgservice.class); @autowired private redistemplate<string, object> stringobjectredistemplate; @autowired private redisservice redisservice; /** * 给所有用户发送消息 * @param msgdto */ public void pushmsgtoall( pushsocketmsgdto msgdto) { set<object> keys = stringobjectredistemplate.opsforhash().keys(cacheconstants.getuserchannelkey()); keys.foreach(key -> this.pushmsgtouser(msgdto.getcode(), key.tostring(), msgdto.getmessage())); } /** * 给指定用户发送消息 * @param msgdto */ public void pushmsgtouserlist(pushsocketmsgdto msgdto) { for(long userid : msgdto.getuseridlist()){ this.pushmsgtouser(msgdto.getcode(), userid.tostring(), msgdto.getmessage()); } } protected void pushmsgtouser(integer code, string userid, jsonobject message) { //推送到其他负载处理 nettymessage nettymessage = new nettymessage(); nettymessage.setuserid(userid); nettymessage.setcode(code); nettymessage.setmessage(message); redisservice.convertandsend(cacheconstants.topic.sys_socket_push_topic, nettymessage); log.info("推送消息成功! userid:{},message:{}", userid, message); } }
nettymessage和pushsocketmsgdto:
import com.alibaba.fastjson2.jsonobject; import lombok.data; /** * neety消息发布vo * @author dsw * @date 2024/11/18 13:49 */ @data public class nettymessage { private integer code; private string userid; private jsonobject message; }
import com.alibaba.fastjson2.jsonobject; import lombok.data; /** * 发送socket消息dto * @author dsw * @date 2024/11/18 13:39 */ @data public class pushsocketmsgdto { /** * 消息类型 * 详情看 nettymsgenum */ private integer code; /** * 用户id */ private list<long> useridlist; /** * 消息体 */ private jsonobject message; }
测试结果:
以上就是springcloud整合netty集群实现websocket的示例代码的详细内容,更多关于springcloud netty实现websocket的资料请关注代码网其它相关文章!
发表评论