当前位置: 代码网 > it编程>编程语言>Java > 基于SpringBoot + Redis Pub/Sub实现跨实例SSE消息推送

基于SpringBoot + Redis Pub/Sub实现跨实例SSE消息推送

2026年04月23日 Java 我要评论
引言在构建web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等。sse(server-sent events)相比websocket更轻量,适合单向推送

引言

在构建web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等。sse(server-sent events)相比websocket更轻量,适合单向推送场景。但当服务部署多个实例时,问题就出现了:用户a连接的是实例1,用户b连接的是实例2,a发送的消息如何推送给b?本文介绍一种基于redis pub/sub的解决方案,让sse连接能够跨实例互通。

上图展示了完整的消息流转过程:

  1. 建立连接:用户a和b分别连接到不同的后端实例,每个实例维护着自己的sse连接池。
  2. 发送消息:用户a发起私信请求,请求落在实例1上。
  3. redis广播:实例1将消息发布到redis的station:message频道,所有订阅了该频道的实例都会收到。
  4. 推送消息:实例2发现目标用户b在自己身上,通过sse连接将消息推送给b。实例1收到广播后也会检查,发现目标用户不在自己身上,则直接忽略。

一、引入依赖

spring boot web提供了sse的支持(sseemitter),而spring-boot-starter-data-redis则为我们带来了redis连接和pub/sub能力。commons-pool2是连接池,生产环境必备,避免频繁创建连接带来的性能损耗。

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
<dependency>
    <groupid>org.apache.commons</groupid>
    <artifactid>commons-pool2</artifactid>
</dependency>

二、sse连接管理器

sseemittermanager是整个方案的核心。这里有几个设计点值得注意:

  • 支持多标签页:一个用户可能打开多个浏览器标签页,所以用map<string, map<string, sseemitter>>来组织,外层key是userid,内层key是token(每个标签页唯一)。
  • 生命周期管理:通过oncompletion、ontimeout、onerror回调来清理连接,防止内存泄漏。
  • 不超时设置:new sseemitter(0l)表示永不超时,你也可以根据业务需要设置一个合理的超时时间(如30分钟)。
  • 全站广播:broadcast()方法会遍历所有在线用户并推送,适合系统公告类消息。
@component
@slf4j
public class sseemittermanager {
    /**
     * 用户id -> (连接token -> sseemitter)
     * 一个用户可能有多个浏览器标签页,用token区分
     */
    private final map<string, map<string, sseemitter>> useremitters = new concurrenthashmap<>();
    /**
     * 建立sse连接
     * @param userid 用户id
     * @param token 连接标识(可用uuid)
     */
    public sseemitter connect(string userid, string token) {
        // 超时时间设为0表示不超时,也可设置具体毫秒数
        sseemitter emitter = new sseemitter(0l);
        // 注册回调:连接关闭时清理
        emitter.oncompletion(() -> removeemitter(userid, token));
        emitter.ontimeout(() -> removeemitter(userid, token));
        emitter.onerror(e -> removeemitter(userid, token));
        // 存储连接
        useremitters.computeifabsent(userid, k -> new concurrenthashmap<>())
                    .put(token, emitter);
        log.info("sse connected: userid={}, token={}, total users={}", 
                 userid, token, useremitters.size());
        return emitter;
    }
    /**
     * 向指定用户推送消息
     */
    public void sendtouser(string userid, string message) {
        map<string, sseemitter> emitters = useremitters.get(userid);
        if (emitters == null || emitters.isempty()) {
            log.debug("user {} not online, message stored for later", userid);
            return;
        }
        // 向该用户所有连接推送
        emitters.foreach((token, emitter) -> {
            try {
                emitter.send(sseemitter.event()
                    .name("message")
                    .data(message));
            } catch (ioexception e) {
                log.error("send to user {} failed, removing emitter", userid, e);
                removeemitter(userid, token);
            }
        });
    }
    /**
     * 全站广播
     */
    public void broadcast(string message) {
        useremitters.foreach((userid, emitters) -> {
            sendtouser(userid, message);
        });
        log.info("broadcast message to {} users", useremitters.size());
    }
    /**
     * 获取当前在线人数
     */
    public int getonlinecount() {
        return useremitters.size();
    }
    private void removeemitter(string userid, string token) {
        map<string, sseemitter> emitters = useremitters.get(userid);
        if (emitters != null) {
            emitters.remove(token);
            if (emitters.isempty()) {
                useremitters.remove(userid);
            }
        }
    }
}

三、redis消息订阅者

redismessagesubscriber实现了messagelistener接口,它会监听station:message频道。当收到redis消息时:

  • 将json反序列化为stationmessage对象。
  • 根据type字段判断是私信还是广播。
  • 调用sseemittermanager的方法推送给目标用户。

这里有个细节:每个后端实例都会收到自己发布的消息,所以推送前需要判断目标用户是否在当前实例上——这个判断逻辑其实隐含在sendtouser中:如果目标用户不在本实例的连接池里,就直接返回,不会报错。

@component
@slf4j
public class redismessagesubscriber implements messagelistener {
    
    @autowired
    private sseemittermanager sseemittermanager;
    
    @autowired
    private objectmapper objectmapper;
    
    @override
    public void onmessage(message message, byte[] pattern) {
        try {
            string channel = new string(message.getchannel());
            string body = new string(message.getbody());
            
            // 解析消息
            stationmessage msg = objectmapper.readvalue(body, stationmessage.class);
            
            log.info("received redis message: channel={}, type={}, target={}", 
                     channel, msg.gettype(), msg.gettargetuserid());
            
            // 根据消息类型分发
            if ("user".equals(msg.gettype())) {
                // 私信:发给指定用户
                sseemittermanager.sendtouser(msg.gettargetuserid(), msg.getcontent());
            } else if ("broadcast".equals(msg.gettype())) {
                // 广播:发给所有在线用户
                sseemittermanager.broadcast(msg.getcontent());
            }
            
        } catch (exception e) {
            log.error("failed to process redis message", e);
        }
    }
}

四、redis配置

redismessagelistenercontainer是spring data redis提供的消息容器,它会自动管理订阅和监听线程。注意这里订阅的是station:message频道,你可以根据业务需要定义多个频道(比如station:notice、station:system)。

redistemplate的序列化配置也很重要:key使用stringredisserializer保证可读性,value使用jackson2jsonredisserializer来支持对象存储。

@configuration
public class redisconfig {
    
    @bean
    public redismessagelistenercontainer redismessagelistenercontainer(
            redisconnectionfactory connectionfactory,
            redismessagesubscriber subscriber) {
        
        redismessagelistenercontainer container = new redismessagelistenercontainer();
        container.setconnectionfactory(connectionfactory);
        // 订阅站内信频道
        container.addmessagelistener(subscriber, new channeltopic("station:message"));
        return container;
    }
    
    @bean
    public redistemplate<string, object> redistemplate(redisconnectionfactory factory) {
        redistemplate<string, object> template = new redistemplate<>();
        template.setconnectionfactory(factory);
        template.setkeyserializer(new stringredisserializer());
        template.setvalueserializer(new jackson2jsonredisserializer<>(object.class));
        return template;
    }
}

五、消息实体

stationmessage是消息的载体,在redis中传输的json格式就对应这个结构。字段设计上:

  • id:消息唯一标识,可用于去重和消息历史记录。
  • type:区分私信和广播,方便订阅者做路由。
  • targetuserid:私信时的接收者,广播时可为空。
  • timestamp:时间戳,客户端可以用来做消息排序或展示发送时间。
@data
@builder
@noargsconstructor
@allargsconstructor
public class stationmessage {
    private string id;           // 消息id
    private string type;         // user:私信, broadcast:广播
    private string targetuserid; // 私信时的目标用户id
    private string content;      // 消息内容
    private string senderid;     // 发送者id
    private long timestamp;      // 时间戳
}

六、消息发送服务

messageservice封装了发送逻辑。核心动作很简单:构造stationmessage -> 序列化为json -> redistemplate.convertandsend()。发布之后,所有实例的订阅者都会收到消息,相当于redis帮我们做了一个“广播式”的跨实例通信。

这种设计的优点是:发送方不需要知道消息最终由哪个实例处理,也不需要维护实例之间的网络连接,所有协调工作都交给了redis。

@service
@slf4j
public class messageservice {
    
    @autowired
    private redistemplate<string, object> redistemplate;
    
    @autowired
    private sseemittermanager sseemittermanager;
    
    @autowired
    private objectmapper objectmapper;
    
    /**
     * 发送私信
     */
    public void sendprivatemessage(string fromuserid, string touserid, string content) {
        stationmessage msg = stationmessage.builder()
            .id(uuid.randomuuid().tostring())
            .type("user")
            .targetuserid(touserid)
            .senderid(fromuserid)
            .content(content)
            .timestamp(system.currenttimemillis())
            .build();
        
        try {
            string json = objectmapper.writevalueasstring(msg);
            // 发布到redis,所有实例都会收到
            redistemplate.convertandsend("station:message", json);
            log.info("private message sent: {} -> {}", fromuserid, touserid);
        } catch (jsonprocessingexception e) {
            log.error("failed to serialize message", e);
        }
    }
    
    /**
     * 全站广播
     */
    public void broadcast(string fromuserid, string content) {
        stationmessage msg = stationmessage.builder()
            .id(uuid.randomuuid().tostring())
            .type("broadcast")
            .senderid(fromuserid)
            .content(content)
            .timestamp(system.currenttimemillis())
            .build();
        
        try {
            string json = objectmapper.writevalueasstring(msg);
            redistemplate.convertandsend("station:message", json);
            log.info("broadcast message sent by: {}", fromuserid);
        } catch (jsonprocessingexception e) {
            log.error("failed to serialize broadcast", e);
        }
    }
}

七、controller层

controller对外暴露了三个核心接口:

  • get /api/sse/connect:客户端通过eventsource或fetch api调用这个接口建立sse连接。每个连接会生成一个唯一token,用于后续的清理。
  • post /api/sse/private:发送私信,需要提供发送者、接收者和消息内容。
  • post /api/sse/broadcast:发送广播消息。
  • get /api/sse/online-count:查询当前在线人数,可用于展示“在线状态”或做监控。

生产环境建议给这些接口加上认证鉴权(比如从token中解析userid),避免伪造身份。

@restcontroller
@requestmapping("/api/sse")
@slf4j
public class ssecontroller {
    
    @autowired
    private sseemittermanager sseemittermanager;
    
    @autowired
    private messageservice messageservice;
    
    /**
     * sse连接端点
     */
    @getmapping(value = "/connect", produces = mediatype.text_event_stream_value)
    public sseemitter connect(@requestparam string userid) {
        string token = uuid.randomuuid().tostring();
        return sseemittermanager.connect(userid, token);
    }
    
    /**
     * 发送私信
     */
    @postmapping("/private")
    public responseentity<?> sendprivate(@requestbody privatemessagerequest request) {
        messageservice.sendprivatemessage(
            request.getfromuserid(),
            request.gettouserid(),
            request.getcontent()
        );
        return responseentity.ok().build();
    }
    
    /**
     * 全站广播
     */
    @postmapping("/broadcast")
    public responseentity<?> broadcast(@requestbody broadcastrequest request) {
        messageservice.broadcast(request.getfromuserid(), request.getcontent());
        return responseentity.ok().build();
    }
    
    /**
     * 获取在线人数
     */
    @getmapping("/online-count")
    public responseentity<integer> getonlinecount() {
        return responseentity.ok(sseemittermanager.getonlinecount());
    }
}

以上就是基于springboot + redis pub/sub实现跨实例sse消息推送的详细内容,更多关于springboot跨实例sse消息推送的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com