引言
在构建web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等。sse(server-sent events)相比websocket更轻量,适合单向推送场景。但当服务部署多个实例时,问题就出现了:用户a连接的是实例1,用户b连接的是实例2,a发送的消息如何推送给b?本文介绍一种基于redis pub/sub的解决方案,让sse连接能够跨实例互通。

上图展示了完整的消息流转过程:
- 建立连接:用户a和b分别连接到不同的后端实例,每个实例维护着自己的sse连接池。
- 发送消息:用户a发起私信请求,请求落在实例1上。
- redis广播:实例1将消息发布到redis的station:message频道,所有订阅了该频道的实例都会收到。
- 推送消息:实例2发现目标用户b在自己身上,通过sse连接将消息推送给b。实例1收到广播后也会检查,发现目标用户不在自己身上,则直接忽略。
一、引入依赖
spring boot web提供了sse的支持(sseemitter),而spring-boot-starter-data-redis则为我们带来了redis连接和pub/sub能力。commons-pool2是连接池,生产环境必备,避免频繁创建连接带来的性能损耗。
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
<dependency>
<groupid>org.apache.commons</groupid>
<artifactid>commons-pool2</artifactid>
</dependency>二、sse连接管理器
sseemittermanager是整个方案的核心。这里有几个设计点值得注意:
- 支持多标签页:一个用户可能打开多个浏览器标签页,所以用map<string, map<string, sseemitter>>来组织,外层key是userid,内层key是token(每个标签页唯一)。
- 生命周期管理:通过oncompletion、ontimeout、onerror回调来清理连接,防止内存泄漏。
- 不超时设置:new sseemitter(0l)表示永不超时,你也可以根据业务需要设置一个合理的超时时间(如30分钟)。
- 全站广播:broadcast()方法会遍历所有在线用户并推送,适合系统公告类消息。
@component
@slf4j
public class sseemittermanager {
/**
* 用户id -> (连接token -> sseemitter)
* 一个用户可能有多个浏览器标签页,用token区分
*/
private final map<string, map<string, sseemitter>> useremitters = new concurrenthashmap<>();
/**
* 建立sse连接
* @param userid 用户id
* @param token 连接标识(可用uuid)
*/
public sseemitter connect(string userid, string token) {
// 超时时间设为0表示不超时,也可设置具体毫秒数
sseemitter emitter = new sseemitter(0l);
// 注册回调:连接关闭时清理
emitter.oncompletion(() -> removeemitter(userid, token));
emitter.ontimeout(() -> removeemitter(userid, token));
emitter.onerror(e -> removeemitter(userid, token));
// 存储连接
useremitters.computeifabsent(userid, k -> new concurrenthashmap<>())
.put(token, emitter);
log.info("sse connected: userid={}, token={}, total users={}",
userid, token, useremitters.size());
return emitter;
}
/**
* 向指定用户推送消息
*/
public void sendtouser(string userid, string message) {
map<string, sseemitter> emitters = useremitters.get(userid);
if (emitters == null || emitters.isempty()) {
log.debug("user {} not online, message stored for later", userid);
return;
}
// 向该用户所有连接推送
emitters.foreach((token, emitter) -> {
try {
emitter.send(sseemitter.event()
.name("message")
.data(message));
} catch (ioexception e) {
log.error("send to user {} failed, removing emitter", userid, e);
removeemitter(userid, token);
}
});
}
/**
* 全站广播
*/
public void broadcast(string message) {
useremitters.foreach((userid, emitters) -> {
sendtouser(userid, message);
});
log.info("broadcast message to {} users", useremitters.size());
}
/**
* 获取当前在线人数
*/
public int getonlinecount() {
return useremitters.size();
}
private void removeemitter(string userid, string token) {
map<string, sseemitter> emitters = useremitters.get(userid);
if (emitters != null) {
emitters.remove(token);
if (emitters.isempty()) {
useremitters.remove(userid);
}
}
}
}三、redis消息订阅者
redismessagesubscriber实现了messagelistener接口,它会监听station:message频道。当收到redis消息时:
- 将json反序列化为stationmessage对象。
- 根据type字段判断是私信还是广播。
- 调用sseemittermanager的方法推送给目标用户。
这里有个细节:每个后端实例都会收到自己发布的消息,所以推送前需要判断目标用户是否在当前实例上——这个判断逻辑其实隐含在sendtouser中:如果目标用户不在本实例的连接池里,就直接返回,不会报错。
@component
@slf4j
public class redismessagesubscriber implements messagelistener {
@autowired
private sseemittermanager sseemittermanager;
@autowired
private objectmapper objectmapper;
@override
public void onmessage(message message, byte[] pattern) {
try {
string channel = new string(message.getchannel());
string body = new string(message.getbody());
// 解析消息
stationmessage msg = objectmapper.readvalue(body, stationmessage.class);
log.info("received redis message: channel={}, type={}, target={}",
channel, msg.gettype(), msg.gettargetuserid());
// 根据消息类型分发
if ("user".equals(msg.gettype())) {
// 私信:发给指定用户
sseemittermanager.sendtouser(msg.gettargetuserid(), msg.getcontent());
} else if ("broadcast".equals(msg.gettype())) {
// 广播:发给所有在线用户
sseemittermanager.broadcast(msg.getcontent());
}
} catch (exception e) {
log.error("failed to process redis message", e);
}
}
}
四、redis配置
redismessagelistenercontainer是spring data redis提供的消息容器,它会自动管理订阅和监听线程。注意这里订阅的是station:message频道,你可以根据业务需要定义多个频道(比如station:notice、station:system)。
redistemplate的序列化配置也很重要:key使用stringredisserializer保证可读性,value使用jackson2jsonredisserializer来支持对象存储。
@configuration
public class redisconfig {
@bean
public redismessagelistenercontainer redismessagelistenercontainer(
redisconnectionfactory connectionfactory,
redismessagesubscriber subscriber) {
redismessagelistenercontainer container = new redismessagelistenercontainer();
container.setconnectionfactory(connectionfactory);
// 订阅站内信频道
container.addmessagelistener(subscriber, new channeltopic("station:message"));
return container;
}
@bean
public redistemplate<string, object> redistemplate(redisconnectionfactory factory) {
redistemplate<string, object> template = new redistemplate<>();
template.setconnectionfactory(factory);
template.setkeyserializer(new stringredisserializer());
template.setvalueserializer(new jackson2jsonredisserializer<>(object.class));
return template;
}
}
五、消息实体
stationmessage是消息的载体,在redis中传输的json格式就对应这个结构。字段设计上:
- id:消息唯一标识,可用于去重和消息历史记录。
- type:区分私信和广播,方便订阅者做路由。
- targetuserid:私信时的接收者,广播时可为空。
- timestamp:时间戳,客户端可以用来做消息排序或展示发送时间。
@data
@builder
@noargsconstructor
@allargsconstructor
public class stationmessage {
private string id; // 消息id
private string type; // user:私信, broadcast:广播
private string targetuserid; // 私信时的目标用户id
private string content; // 消息内容
private string senderid; // 发送者id
private long timestamp; // 时间戳
}
六、消息发送服务
messageservice封装了发送逻辑。核心动作很简单:构造stationmessage -> 序列化为json -> redistemplate.convertandsend()。发布之后,所有实例的订阅者都会收到消息,相当于redis帮我们做了一个“广播式”的跨实例通信。
这种设计的优点是:发送方不需要知道消息最终由哪个实例处理,也不需要维护实例之间的网络连接,所有协调工作都交给了redis。
@service
@slf4j
public class messageservice {
@autowired
private redistemplate<string, object> redistemplate;
@autowired
private sseemittermanager sseemittermanager;
@autowired
private objectmapper objectmapper;
/**
* 发送私信
*/
public void sendprivatemessage(string fromuserid, string touserid, string content) {
stationmessage msg = stationmessage.builder()
.id(uuid.randomuuid().tostring())
.type("user")
.targetuserid(touserid)
.senderid(fromuserid)
.content(content)
.timestamp(system.currenttimemillis())
.build();
try {
string json = objectmapper.writevalueasstring(msg);
// 发布到redis,所有实例都会收到
redistemplate.convertandsend("station:message", json);
log.info("private message sent: {} -> {}", fromuserid, touserid);
} catch (jsonprocessingexception e) {
log.error("failed to serialize message", e);
}
}
/**
* 全站广播
*/
public void broadcast(string fromuserid, string content) {
stationmessage msg = stationmessage.builder()
.id(uuid.randomuuid().tostring())
.type("broadcast")
.senderid(fromuserid)
.content(content)
.timestamp(system.currenttimemillis())
.build();
try {
string json = objectmapper.writevalueasstring(msg);
redistemplate.convertandsend("station:message", json);
log.info("broadcast message sent by: {}", fromuserid);
} catch (jsonprocessingexception e) {
log.error("failed to serialize broadcast", e);
}
}
}
七、controller层
controller对外暴露了三个核心接口:
- get /api/sse/connect:客户端通过eventsource或fetch api调用这个接口建立sse连接。每个连接会生成一个唯一token,用于后续的清理。
- post /api/sse/private:发送私信,需要提供发送者、接收者和消息内容。
- post /api/sse/broadcast:发送广播消息。
- get /api/sse/online-count:查询当前在线人数,可用于展示“在线状态”或做监控。
生产环境建议给这些接口加上认证鉴权(比如从token中解析userid),避免伪造身份。
@restcontroller
@requestmapping("/api/sse")
@slf4j
public class ssecontroller {
@autowired
private sseemittermanager sseemittermanager;
@autowired
private messageservice messageservice;
/**
* sse连接端点
*/
@getmapping(value = "/connect", produces = mediatype.text_event_stream_value)
public sseemitter connect(@requestparam string userid) {
string token = uuid.randomuuid().tostring();
return sseemittermanager.connect(userid, token);
}
/**
* 发送私信
*/
@postmapping("/private")
public responseentity<?> sendprivate(@requestbody privatemessagerequest request) {
messageservice.sendprivatemessage(
request.getfromuserid(),
request.gettouserid(),
request.getcontent()
);
return responseentity.ok().build();
}
/**
* 全站广播
*/
@postmapping("/broadcast")
public responseentity<?> broadcast(@requestbody broadcastrequest request) {
messageservice.broadcast(request.getfromuserid(), request.getcontent());
return responseentity.ok().build();
}
/**
* 获取在线人数
*/
@getmapping("/online-count")
public responseentity<integer> getonlinecount() {
return responseentity.ok(sseemittermanager.getonlinecount());
}
}
以上就是基于springboot + redis pub/sub实现跨实例sse消息推送的详细内容,更多关于springboot跨实例sse消息推送的资料请关注代码网其它相关文章!
发表评论