引言
在分布式系统中,spring cloud 为微服务架构提供了丰富的功能,而 netty 是一个高性能的网络通信框架。将二者结合实现 socket 集群,可以满足大规模、高性能网络通信的需求,实现前后端间高效稳定的通信。
1. 服务注册和发现中心
这里服务注册和发行中心使用nacos为例(需要启动一个nacos服务器)。
微服务注册: 在每一个微服务项目中,添加nacos客户端连接,并在配置文件中指定服务名称和端口。例如:
# tomcat
server:
port: 9201
netty:
port: 10201
application:
name: yhy-netty-server
# spring
spring:
application:
# 应用名称
name: soc-dmoasp-system
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: nacos-registry:8858
config:
# 配置中心地址
server-addr: nacos-registry:8858
file-extension: yml
# 共享配置
shared-configs:
- data-id: application.${spring.cloud.nacos.config.file-extension}
refresh: true
- data-id: soc-dmoasp-redission.${spring.cloud.nacos.config.file-extension}
- data-id: soc-dmoasp-druid.${spring.cloud.nacos.config.file-extension}
这是一个基本的服务配置。里面关于netty的applicaiton.name和port可以通过nacos的namingservice类手动注册。
1.1. netty服务器搭建
- 添加netty依赖:在具体微服务中的
pom.xml中添加netty依赖:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-reactor-netty</artifactid> </dependency>
- netty启动类:创建一个nettyserver类,用于启动netty,示例如下:
import com.alibaba.cloud.nacos.nacosdiscoveryproperties;
import com.alibaba.nacos.api.propertykeyconst;
import com.alibaba.nacos.api.naming.namingfactory;
import com.alibaba.nacos.api.naming.namingservice;
import com.alibaba.nacos.api.naming.pojo.instance;
import com.soc.dmoasp.system.server.handler.websocketidlestatehandler;
import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.channelfuture;
import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.eventloopgroup;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.socketchannel;
import io.netty.channel.socket.nio.nioserversocketchannel;
import io.netty.handler.codec.http.httpobjectaggregator;
import io.netty.handler.codec.http.httpservercodec;
import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler;
import io.netty.handler.stream.chunkedwritehandler;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.boot.commandlinerunner;
import org.springframework.scheduling.annotation.async;
import org.springframework.stereotype.component;
import javax.annotation.predestroy;
import java.net.inetaddress;
import java.util.properties;
/**
* netty服务
* @author dsw
* @date 2024/11/18 17:34
*/
@component
public class nettyserver implements commandlinerunner {
logger log = loggerfactory.getlogger(nettyserver.class);
@autowired
private nacosdiscoveryproperties nacosdiscoveryproperties;
@value("${server.netty.port}")
private integer nettyport;
@value("${server.netty.application.name}")
private string nettyname;
eventloopgroup bossgroup;
eventloopgroup workgroup;
@override
public void run(string... args) throws exception {
log.info("初始化netty配置开始");
//netty 服务端启动的端口不可和springboot启动类的端口号重复
this.start();
//关闭服务器的时候同时关闭netty服务
runtime.getruntime().addshutdownhook(new thread(() -> {
try {
this.destroy();
} catch (interruptedexception e) {
log.error(e.getmessage());
}
}));
}
@async
public void start() throws interruptedexception {
try {
bossgroup = new nioeventloopgroup(1);
workgroup = new nioeventloopgroup(10);
serverbootstrap bootstrap = new serverbootstrap();
// bossgroup辅助客户端的tcp连接请求, workgroup负责与客户端之前的读写操作
bootstrap.group(bossgroup, workgroup)
// 指定channel
.channel(nioserversocketchannel.class)
.childhandler(new channelinitializer<socketchannel>() {
@override
protected void initchannel(socketchannel socketchannel) throws exception {
channelpipeline pipeline = socketchannel.pipeline();
pipeline
//http解码
.addlast(new httpservercodec())
.addlast(new chunkedwritehandler())
//http段聚合
.addlast(new httpobjectaggregator(1024*1024))
//将http协议转成ws协议
.addlast(new websocketserverprotocolhandler("/socket"))
;
}
});
registernamingservice(nettyname,nettyport);
// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
channelfuture future = bootstrap.bind(nettyport).sync();
if (future.issuccess()) {
log.info("server started and listen on:{}", future.channel().localaddress());
log.info("启动 netty server");
}
} catch (interruptedexception e) {
log.error("netty异常:{}", e.getmessage());
}
}
/**
* 将netty服务注册进nacos
*
* @param nettyname 服务名称
* @param nettyport 服务端口号
*/
private void registernamingservice(string nettyname, integer nettyport) {
try {
properties properties = new properties();
properties.setproperty(propertykeyconst.server_addr, nacosdiscoveryproperties.getserveraddr());
properties.setproperty(propertykeyconst.namespace, nacosdiscoveryproperties.getnamespace());
properties.setproperty(propertykeyconst.username, nacosdiscoveryproperties.getusername());
properties.setproperty(propertykeyconst.password, nacosdiscoveryproperties.getpassword());
namingservice namingservice = namingfactory.createnamingservice(properties);
inetaddress address = inetaddress.getlocalhost();
// 定义服务实例信息
instance instance = new instance();
instance.setip(address.gethostaddress());
instance.setport(nettyport);
instance.setweight(1.0);
instance.sethealthy(true);
namingservice.registerinstance(nettyname, nacosdiscoveryproperties.getgroup(), instance);
} catch (exception e) {
throw new runtimeexception(e);
}
}
/**
* 释放资源
*/
@predestroy
public void destroy() throws interruptedexception {
if (bossgroup != null) {
bossgroup.shutdowngracefully().sync();
}
if (workgroup != null) {
workgroup.shutdowngracefully().sync();
}
log.info("关闭netty");
}
}
使用commandlinerunner接口实现run方法在启动项目的时候把netty服务带起。
bossgroup 和 workgroup 的角色区别
bossgroup(老板组)
主要职责是负责监听服务器端的端口,等待新的客户端连接请求到来。它就像是公司里负责接待新客户的前台人员,当有新客户(客户端)想要连接到服务器时,
bossgroup中的eventloop会接收到这个连接请求。一般情况下,
bossgroup只需要配置较少数量的eventloop就可以满足需求,因为它主要处理的是连接建立的初期阶段,即接受新连接这个相对不那么频繁的操作(相比于后续处理大量数据传输等操作)。通常会设置为 1 个eventloop或者根据服务器的具体性能和预期的连接请求频率适当增加数量,但总体数量相对较少。workgroup(工作组)
一旦
bossgroup接受了新的客户端连接,就会把这个新连接交给workgroup来进一步处理后续的所有与该连接相关的操作,比如读取客户端发送的数据、向客户端发送响应数据等。它就像是公司里负责具体为客户办理业务的工作人员。workgroup需要处理大量的实际业务数据传输和交互工作,所以通常会根据服务器的性能和预期要处理的并发连接数量等因素,配置相对较多数量的eventloop。例如,在处理高并发场景时,可能会配置几十甚至上百个eventloop来确保能够高效地处理众多客户端连接的各种业务操作。
registernamingservice方法
这时候可以看到我们nacos配置中配置了server.netty.port和server.netty.application.name这两个参数分别对应netty的端口和netty的微服务应用名。
registernamingservice方法用于往nacos中注册服务,这里通过namingservice类的registerinstance方法将netty服务注册进nacos中。
1.2. gateway网关转发
微服务中所有的请求都是由网关转发,这里使用gateway转发。
# spring配置
spring:
cloud:
gateway:
discovery:
locator:
lowercaseserviceid: true
enabled: true
routes:
# 系统模块
- id: soc-dmoasp-system
uri: lb://soc-dmoasp-system
predicates:
- path=/system/**
filters:
- stripprefix=1
# netty服务
- id: netty-server
uri: lb:ws://soc-netty-server
predicates:
- path=/netty-server/**
filters:
- stripprefix=1
#不需要进行权限校验的uri
security:
ignore:
whites:
- /auth/logout
- /auth/login
- /auth/register
- /*/v2/api-docs
- /csrf
#netty连接地址
- /netty-server/**
配置文件中添加netty路由,在鉴权网关中需要将socket地址放行,不进行权限验证。例如:
@component
@refreshscope
public class authfilter implements globalfilter, ordered
{
private static final logger log = loggerfactory.getlogger(authfilter.class);
// 排除过滤的 uri 地址,nacos自行添加
@autowired
private ignorewhiteproperties ignorewhite;
@override
public mono<void> filter(serverwebexchange exchange, gatewayfilterchain chain)
{
serverhttprequest request = exchange.getrequest();
serverhttprequest.builder mutate = request.mutate();
string url = request.geturi().getpath();
// 跳过不需要验证的路径
if (stringutils.matches(url, ignorewhite.getwhites()))
{
return chain.filter(exchange);
}
......
}
}
启动gateway和system模块


启动完成后system模块会打印nettyserver输出的启动日志,nacos中也会有手动注册的netty服务。
通过ws://127.0.0.1:8080/netty-server/socket就可以直接连接上netty服务器(8080为gateway的端口)。

2. 鉴权、心跳、客户端与服务端之间的通信
2.1. 鉴权
创建authhandler类,继承simplechannelinboundhandler类重写channelread0方法,channelread0中可以监听到客户端往服务端发送的消息。 例如:
import com.alibaba.fastjson2.json;
import com.alibaba.fastjson2.jsonobject;
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.core.constant.tokenconstants;
import com.soc.dmoasp.common.core.enums.nettymsgenum;
import com.soc.dmoasp.common.core.utils.jwtutils;
import com.soc.dmoasp.common.core.utils.stringutils;
import com.soc.dmoasp.common.redis.service.redisservice;
import com.soc.dmoasp.system.server.vo.nettyresult;
import io.jsonwebtoken.claims;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
import io.netty.util.attributekey;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.data.redis.core.redistemplate;
/**
* netty鉴权处理
* @author dsw
* @date 2024/11/18 17:55
*/
public class authhandler extends simplechannelinboundhandler<textwebsocketframe> {
logger log = loggerfactory.getlogger(authhandler.class);
private final redistemplate<string, object> redistemplate;
private final redisservice redisservice;
public authhandler(redistemplate<string, object> stringobjectredistemplate, redisservice redisservice) {
redistemplate = stringobjectredistemplate;
this.redisservice = redisservice;
}
@override
protected void channelread0(channelhandlercontext ctx, textwebsocketframe textwebsocketframe) throws exception {
try {
jsonobject clientmessage = json.parseobject(textwebsocketframe.text());
//获取code 只判断code为鉴权的消息类型
integer code = clientmessage.getinteger("code");
if (nettymsgenum.auth_message.getcode().equals(code)) {
//获取token
string token = clientmessage.getstring("token");
if (stringutils.isempty(token))
{
ctx.channel().writeandflush(nettyresult.authfail("令牌不能为空"));
ctx.close();
}
// 如果前端设置了令牌前缀,则裁剪掉前缀
if (stringutils.isnotempty(token) && token.startswith(tokenconstants.prefix))
{
token = token.replacefirst(tokenconstants.prefix, stringutils.empty);
}
//jwt校验
claims claims = jwtutils.parsetoken(token);
if (claims == null)
{
ctx.channel().writeandflush(nettyresult.authfail("令牌已过期或验证不正确"));
ctx.close();
}
string userkey = jwtutils.getuserkey(claims);
//从redis中查看是否有这个token没有则不能登录
boolean islogin = redisservice.haskey(gettokenkey(userkey));
if (!islogin)
{
ctx.channel().writeandflush(nettyresult.authfail("登录状态已过期"));
ctx.close();
}
//获取用户保存至socket连接会话中
string userid = jwtutils.getuserid(claims);
attributekey<string> useridkey = attributekey.valueof("userid");
ctx.channel().attr(useridkey).setifabsent(userid);
jsonobject jsonobject = new jsonobject();
jsonobject.put("userid",userid);
log.info("有新的socket客户端链接 userid :{}", userid);
//将连接信息保存至redis中key为userid value为ctx.channel().id()
redistemplate.opsforhash().put(cacheconstants.getuserchannelkey(),userid,ctx.channel().id());
ctx.channel().writeandflush(nettyresult.success(nettymsgenum.auth_message.getcode(), "鉴权成功", jsonobject));
//鉴权完成后移除authhandler消息监听
ctx.pipeline().remove(authhandler.class);
} else {
ctx.channel().writeandflush(nettyresult.authfail("请先鉴权,在发送其他类型请求!"));
ctx.close();
}
} catch (exception e) {
log.error(e.getmessage());
ctx.channel().writeandflush(nettyresult.authfail("鉴权失败"));
ctx.close();
}
}
/**
* 获取缓存key
*/
private string gettokenkey(string token)
{
return cacheconstants.login_token_key + token;
}
}
泛型textwebsocketframe表示接收文本类型的消息。
其中连接的用户信息保存到redis中,redistemplate<string, object> redistemplate对象是用来保存netty连接信息的,序列化使用的是string(用户信息用string存储,使用json序列化会反序列化失败),对应接收的json串如下:
{"code":1001,token:"bearer xxxx"}
code取nettymsgenum中的code,token则是登录时生成的token令牌。
鉴权后将authhandler移除,会话后续的消息交互不在进authhandler。
nettymsgenum如下:
/**
* netty消息类型枚举
* @author dsw
* @date 2024/11/18 17:58
*/
public enum nettymsgenum {
auth_message(1001, "鉴权消息","auth-netty"),
//{'code':1003,'data':{'unreadcount':0}}
notice_message(1003, "公告通知消息","notice-netty"),
heart_message(1006, "心跳消息","heart-netty"),
error_message(-1, "错误",null);
private final integer code;
private final string info;
private final string strategyname;
nettymsgenum(integer code, string info, string strategyname){
this.code = code;
this.info = info;
this.strategyname = strategyname;
}
public static nettymsgenum getbycode(integer code) {
for (nettymsgenum msgenum : values()) {
if (msgenum.getcode().equals(code)) {
return msgenum;
}
}
return error_message;
}
public integer getcode() {
return code;
}
public string getinfo() {
return info;
}
public string getstrategyname() {
return strategyname;
}
}
nettyresult如下:
import com.alibaba.fastjson2.json;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
import java.io.serializable;
/**
* netty响应实体
* @author dsw
* @date 2024/11/18 18:02
*/
public class nettyresult implements serializable {
private static final long serialversionuid = 1l;
private integer code;
private string message;
private object data;
public nettyresult(integer code, string message, object data) {
this.code = code;
this.message = message;
this.data = data;
}
public static textwebsocketframe fail(string message) {
return new textwebsocketframe(json.tojsonstring(new nettyresult(-1, message, null)));
}
public static textwebsocketframe authfail(string message) {
return new textwebsocketframe(json.tojsonstring(new nettyresult(-2, message, null)));
}
public static textwebsocketframe success( string message) {
return new textwebsocketframe(json.tojsonstring(new nettyresult(200, message, null)));
}
public static textwebsocketframe success(integer code, object data) {
return new textwebsocketframe(json.tojsonstring(new nettyresult(code,null, data)));
}
public static textwebsocketframe success(integer code, string message, object data) {
return new textwebsocketframe(json.tojsonstring(new nettyresult(code,message, data)));
}
public integer getcode() {
return code;
}
public string getmessage() {
return message;
}
public object getdata() {
return data;
}
}
最后到nettyserver中的channelinitializer加入authhandler:

我们重新项目后连接socket查看结果

如果不鉴权直接发送消息,服务端会主动断开连接,客户端需要重连。

这就代表已经连接成功了。
2.2. 空闲检测
创建websocketidlestatehandler类继承idlestatehandler类,重写channelidle方法。
import io.netty.channel.channel;
import io.netty.channel.channelhandlercontext;
import io.netty.handler.timeout.idlestate;
import io.netty.handler.timeout.idlestateevent;
import io.netty.handler.timeout.idlestatehandler;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import java.util.concurrent.timeunit;
/**
* 空闲检测
* @author dsw
* @date 2024/11/18 11:47
*/
public class websocketidlestatehandler extends idlestatehandler {
logger log = loggerfactory.getlogger(websocketidlestatehandler.class);
/**
* 默认的读空闲时间
*/
private static final int default_reader_idle_time = 10;
/**
* 默认10秒读空闲断开客户端
*/
public websocketidlestatehandler() {
super(default_reader_idle_time, 0, 0, timeunit.seconds);
}
/**
* 指定心跳时间(秒)
*
* @param readeridletimeseconds 读空闲时间
* @param writeridletimeseconds 写空闲时间
* @param allidletimeseconds 读写空闲时间
*/
public websocketidlestatehandler(int readeridletimeseconds, int writeridletimeseconds, int allidletimeseconds) {
super(readeridletimeseconds, writeridletimeseconds, allidletimeseconds, timeunit.seconds);
}
/**
* 指定心跳时间及时间单位
*
* @param readeridletime 读空闲时间
* @param writeridletime 写空闲时间
* @param allidletime 读写空闲时间
* @param unit 时间单位
*/
public websocketidlestatehandler(long readeridletime, long writeridletime, long allidletime, timeunit unit) {
super(readeridletime, writeridletime, allidletime, unit);
}
/**
* 当空闲事件触发时执行
*/
@override
protected void channelidle(channelhandlercontext ctx, idlestateevent evt) throws exception {
//如果是读空闲
if (evt.state().equals(idlestate.reader_idle)) {
channel channel = ctx.channel();
log.debug("服务端未检测到客户端【{}】的心跳包,强制关闭客户端!", channel.id());
channel.close();
}
super.channelidle(ctx,evt);
}
}
以上实现了父类的构造函数,可以指定具体的空闲时间。当空闲时会触发channelidle方法,则服务端主动断开连接。
最后到nettyserver中的channelinitializer加入websocketidlestatehandler:

加到最前面。

示例设置了10秒断开,需要使用中自行调整。
2.3. 消息通信
2.3.1. 接收客户端的消息
创建websockethandler类,继承simplechannelinboundhandler类重写channelread0、handleradded、handlerremoved、exceptioncaught方法。
channelread0方法:监听客户端发送过来的消息。handleradded方法:websocket连接后会调用,将连接信息添加到通道组handlerremoved方法:断开连接后会调用(服务端、客户端断开都会调用),用于用户下线(删除通道、删除redis中存储的连接信息)exceptioncaught方法:发生异常后调用,发生异常后服务端通常会主动断开连接。
import com.alibaba.fastjson2.json;
import com.alibaba.fastjson2.jsonobject;
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.core.enums.nettymsgenum;
import com.soc.dmoasp.system.server.config.nettyconfig;
import com.soc.dmoasp.system.server.strategy.nettystrategy;
import com.soc.dmoasp.system.server.strategy.nettystrategyfactory;
import com.soc.dmoasp.system.server.vo.nettyresult;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
import io.netty.util.attributekey;
import org.apache.commons.lang3.stringutils;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.data.redis.core.redistemplate;
/**
* websocket处理
* @author dsw
* @date 2024/11/18 10:20
*/
public class websockethandler extends simplechannelinboundhandler<textwebsocketframe> {
logger log = loggerfactory.getlogger(websockethandler.class);
private final redistemplate<string, object> redistemplate;
private final nettystrategyfactory nettystrategyfactory;
public websockethandler(redistemplate<string, object> redistemplate, nettystrategyfactory nettystrategyfactory) {
this.redistemplate = redistemplate;
this.nettystrategyfactory = nettystrategyfactory;
}
/**
* websocket连接创建后调用
*/
@override
public void handleradded(channelhandlercontext ctx) throws exception {
// 添加到channelgroup 通道组
nettyconfig.getchannelgroup().add(ctx.channel());
}
/**
* 读取数据
*/
@override
protected void channelread0(channelhandlercontext ctx, textwebsocketframe frame) throws exception {
attributekey<string> useridkey = attributekey.valueof("userid");
string userid = ctx.channel().attr(useridkey).get();
log.info("收到消息 userid:{} message:{}",userid,frame.text());
// 接收客户端的消息
jsonobject pullmessage = json.parseobject(frame.text());
integer code = pullmessage.getinteger("code");
// 获取消息类型
nettystrategy nettystrategy = nettystrategyfactory.getnettystrategy(nettymsgenum.getbycode(code));
// 处理消息
textwebsocketframe pushmessage = nettystrategy.execute(pullmessage);
// 返回处理结果给客户端
ctx.channel().writeandflush(pushmessage);
}
@override
public void handlerremoved(channelhandlercontext ctx) throws exception {
attributekey<string> useridkey = attributekey.valueof("userid");
string userid = ctx.channel().attr(useridkey).get();
log.info("用户下线了 userid:{}",userid);
// 删除通道
removeuserid(ctx);
}
@override
public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
ctx.channel().writeandflush(nettyresult.fail("系统错误:" + cause.getmessage()));
}
/**
* 删除用户与channel的对应关系
*/
private void removeuserid(channelhandlercontext ctx) {
attributekey<string> useridkey = attributekey.valueof("userid");
string userid = ctx.channel().attr(useridkey).get();
if(stringutils.isnotblank(userid)){
redistemplate.opsforhash().delete(cacheconstants.getuserchannelkey(),userid);
}
}
}
这里收到消息后通过一个策略模式进入不同的策略,通过nettymsgenum里面定义的code指定不同的策略类。
strategy和strategyfactory:
import com.alibaba.fastjson2.jsonobject;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
/**
* netty接收消息处理策略类
* @author dsw
* @date 2024/5/27 10:21
*/
public interface nettystrategy {
/**
* 执行添加数值
*
* @return
*/
textwebsocketframe execute(jsonobject message);
}
import com.soc.dmoasp.common.core.enums.nettymsgenum;
import org.slf4j.logger;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
import java.util.map;
/**
* netty策略工厂
* @author dsw
* @date 2024/11/18 10:20
*/
@component
public class nettystrategyfactory {
logger log = org.slf4j.loggerfactory.getlogger(nettystrategyfactory.class);
/**
* 通过spring容器的方式注入
*/
@autowired
private map<string, nettystrategy> nettystrategy;
/**
* 获取对应策略类
* @param
*/
public nettystrategy getnettystrategy(nettymsgenum nettymsgenum){
if(!nettystrategy.containskey(nettymsgenum.getstrategyname())){
log.warn("没有对应的消息策略");
throw new runtimeexception("没有对应的消息策略");
}
return nettystrategy.get(nettymsgenum.getstrategyname());
}
}
我们实现一个心跳消息的策略:
import com.alibaba.fastjson2.jsonobject;
import com.soc.dmoasp.common.core.enums.nettymsgenum;
import com.soc.dmoasp.system.server.strategy.nettystrategy;
import com.soc.dmoasp.system.server.vo.nettyresult;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
import org.springframework.stereotype.component;
/**
* netty心跳消息
* @author dsw
* @date 2024/5/27 10:25
*/
@component("heart-netty")
public class heartstrategyimpl implements nettystrategy {
@override
public textwebsocketframe execute(jsonobject message) {
string data = message.getstring("data");
if ("ping".equals(data)) {
return nettyresult.success(nettymsgenum.heart_message.getcode(), null, "pong");
}
return nettyresult.fail("消息格式不正确");
}
}

添加至channelinitializer后重启查看效果。

2.3.2. 发送消息给客户端
集群下面的netty配置
方案1:使用redis的发布订阅
方案2:使用mq的发布订阅
我们这里使用使用redis的发布订阅实现。
添加redis订阅器:
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.redis.configure.fastjson2jsonredisserializer;
import com.soc.dmoasp.system.server.receiver.pushmsgredisreceiver;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.data.redis.connection.redisconnectionfactory;
import org.springframework.data.redis.listener.patterntopic;
import org.springframework.data.redis.listener.redismessagelistenercontainer;
import org.springframework.data.redis.listener.adapter.messagelisteneradapter;
import java.util.collections;
import java.util.list;
/**
* redisreceiver 配置
* @author dsw
* @date 2024/11/18 12:07
*/
@configuration
public class redisreceiverconfig {
@bean
public redismessagelistenercontainer redismessagelistenercontainer(redisconnectionfactory redisconnectionfactory,
messagelisteneradapter listeneradapter) {
redismessagelistenercontainer container = new redismessagelistenercontainer();
container.setconnectionfactory(redisconnectionfactory);
list<patterntopic> topics = collections.singletonlist(
patterntopic.of(cacheconstants.topic.sys_socket_push_topic)
);
// 添加订阅者监听类,数量不限.patterntopic定义监听主题,这里监听test-topic主题
container.addmessagelistener(listeneradapter, topics);
return container;
}
@bean
@suppresswarnings(value = { "unchecked", "rawtypes" })
public messagelisteneradapter listeneradapter(pushmsgredisreceiver pushmsgredisreceiver) {
messagelisteneradapter adapter = new messagelisteneradapter(pushmsgredisreceiver);
fastjson2jsonredisserializer serializer = new fastjson2jsonredisserializer(object.class);
adapter.setserializer(serializer);
return adapter;
}
}
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.core.exception.asserts;
import com.soc.dmoasp.common.redis.dto.nettymessage;
import com.soc.dmoasp.system.server.config.nettyconfig;
import com.soc.dmoasp.system.server.vo.nettyresult;
import io.netty.channel.channel;
import io.netty.channel.channelid;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.stereotype.component;
import java.util.objects;
/**
* netty发送消息接收者
* @author dsw
* @date 2024/11/18 12:07
*/
@component
public class pushmsgredisreceiver {
logger log = loggerfactory.getlogger(pushmsgredisreceiver.class);
@autowired
private redistemplate<string,object> stringobjectredistemplate;
/**
* redistopic订阅
* @param nettymessage netty消息
* @param topic netty消息对应的topic
*/
public void handlemessage(nettymessage nettymessage, string topic) {
object channelid = stringobjectredistemplate.opsforhash().get(cacheconstants.getuserchannelkey(), nettymessage.getuserid());
if (objects.isnull(channelid)) {
log.warn("推送消息失败,用户不在线! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage());
asserts.fail("推送消息失败,用户不在线!");
}
channel channel = nettyconfig.getchannelgroup().find((channelid) channelid);
if(channel!=null){
channel.writeandflush(nettyresult.success(nettymessage.getcode(),nettymessage.getmessage()));
log.info("推送消息成功! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage());
}else {
log.warn("推送消息失败,没有找到channel! userid:{},msg:{}",nettymessage.getuserid(),nettymessage.getmessage());
}
}
}
发布订阅的机制就是所有集群都会收到消息,收到消息后每个netty集群都去找对应的消息会话通道,如果没找到则说明连接不到当前服务上,找到通道后则可以直接推送。 这里使用stringobjectredistemplate获取用户通道,避免序列化失败。
redisservice中实现发布消息
/**
* redis消息发布订阅 发布消息
* @param channel 通道id
* @param message 消息
*/
@async
public void convertandsend(string channel, object message) {
redistemplate.convertandsend(channel, message);
}
发布消息的工具类:
import com.alibaba.fastjson2.jsonobject;
import com.soc.dmoasp.common.core.constant.cacheconstants;
import com.soc.dmoasp.common.redis.dto.nettymessage;
import com.soc.dmoasp.common.redis.dto.pushsocketmsgdto;
import org.slf4j.logger;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.stereotype.service;
import java.util.set;
/**
* netty发送消息给服务端
* @author dsw
* @date 2024/11/18 11:47
*/
@service
public class pushsocketmsgservice {
logger log = org.slf4j.loggerfactory.getlogger(pushsocketmsgservice.class);
@autowired
private redistemplate<string, object> stringobjectredistemplate;
@autowired
private redisservice redisservice;
/**
* 给所有用户发送消息
* @param msgdto
*/
public void pushmsgtoall( pushsocketmsgdto msgdto) {
set<object> keys = stringobjectredistemplate.opsforhash().keys(cacheconstants.getuserchannelkey());
keys.foreach(key -> this.pushmsgtouser(msgdto.getcode(), key.tostring(), msgdto.getmessage()));
}
/**
* 给指定用户发送消息
* @param msgdto
*/
public void pushmsgtouserlist(pushsocketmsgdto msgdto) {
for(long userid : msgdto.getuseridlist()){
this.pushmsgtouser(msgdto.getcode(), userid.tostring(), msgdto.getmessage());
}
}
protected void pushmsgtouser(integer code, string userid, jsonobject message) {
//推送到其他负载处理
nettymessage nettymessage = new nettymessage();
nettymessage.setuserid(userid);
nettymessage.setcode(code);
nettymessage.setmessage(message);
redisservice.convertandsend(cacheconstants.topic.sys_socket_push_topic, nettymessage);
log.info("推送消息成功! userid:{},message:{}", userid, message);
}
}
nettymessage和pushsocketmsgdto:
import com.alibaba.fastjson2.jsonobject;
import lombok.data;
/**
* neety消息发布vo
* @author dsw
* @date 2024/11/18 13:49
*/
@data
public class nettymessage {
private integer code;
private string userid;
private jsonobject message;
}
import com.alibaba.fastjson2.jsonobject;
import lombok.data;
/**
* 发送socket消息dto
* @author dsw
* @date 2024/11/18 13:39
*/
@data
public class pushsocketmsgdto {
/**
* 消息类型
* 详情看 nettymsgenum
*/
private integer code;
/**
* 用户id
*/
private list<long> useridlist;
/**
* 消息体
*/
private jsonobject message;
}
测试结果:



以上就是springcloud整合netty集群实现websocket的示例代码的详细内容,更多关于springcloud netty实现websocket的资料请关注代码网其它相关文章!
发表评论