在现代 web 应用开发中,实时数据推送已成为许多业务场景的核心需求,例如实时通知、股票价格更新或在线聊天等。传统的轮询方式效率低下,而 sse(server-sent events,服务器推送事件)作为 html5 提供的一种轻量级技术,能够通过单向长连接实现服务器到客户端的实时数据推送。spring boot 3.4.3 结合 spring webflux,为开发者提供了响应式编程能力,使得实现 sse 功能更加简单高效。本文将详细介绍如何在 spring boot 3.4.3 中基于 spring webflux 实现 sse 功能,并提供完整的代码示例,助你在企业级应用中快速落地实时推送需求。
1. sse 简介
1.1 什么是 sse?
sse(server-sent events)是一种基于 http 协议的服务器推送技术,允许服务器主动向客户端发送数据。客户端通过建立长连接接收服务器推送的事件流,适用于需要实时更新的场景。与 websocket 相比,sse 是单向通信(仅服务器到客户端),实现更简单,且浏览器原生支持。
1.2 sse 的优点
- 轻量级:基于 http,无需额外协议支持。
- 简单性:实现成本低,适合单向推送场景。
- 自动重连:客户端断开后可自动尝试重连。
- 兼容性:html5 标准,主流浏览器均支持。
1.3 适用场景
- 实时通知(如消息提醒)。
- 数据监控(如服务器状态更新)。
- 单向事件流(如日志推送)。
2. spring webflux 简介
spring webflux 是 spring 5 引入的响应式 web 框架,与传统的 spring mvc 不同,它基于 reactor 项目,支持异步非阻塞的编程模型。spring boot 3.4.3 默认集成了 spring webflux,使得开发者可以轻松构建高并发、事件驱动的应用。结合 sse,webflux 的 flux
数据流非常适合处理实时推送需求。
3. 项目实战
以下是基于 spring boot 3.4.3 和 spring webflux 实现 sse 功能的完整步骤。
3.1 添加 maven 依赖
在 pom.xml
中添加 spring webflux 的依赖:
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>3.4.3</version> </parent> <artifactid>springboot-sse</artifactid> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <project.build.sourceencoding>utf-8</project.build.sourceencoding> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-webflux</artifactid> </dependency> </dependencies> </project>
说明:
spring-boot-starter-webflux
提供了 webflux 的核心功能和 reactor 依赖。- spring boot 3.4.3 使用 jdk 17,确保编译器版本匹配。
3.2 创建 sse 控制器
创建一个控制器,用于推送事件流:
package cn.joyous.sse.controller; import org.springframework.http.mediatype; import org.springframework.http.codec.serversentevent; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.restcontroller; import reactor.core.publisher.flux; import java.time.duration; import java.util.concurrent.atomic.atomicinteger; /** */ @restcontroller public class ssecontroller { // 定义一个计数器,用于模拟数据变化 private final atomicinteger counter = new atomicinteger(0); @getmapping(path = "/sse", produces = mediatype.text_event_stream_value) public flux<serversentevent<integer>> streamevents() { return flux.interval(duration.ofseconds(1)) .map(seq -> serversentevent.<integer>builder() .id(string.valueof(seq)) .event("message") .data(counter.incrementandget()) .build()); } }
代码说明:
@getmapping(produces = mediatype.text_event_stream_value)
:指定返回 sse 事件流。flux.interval(duration.ofseconds(1))
:每秒生成一个事件。serversentevent.builder()
:构造 sse 事件,包含id
(事件标识)、event
(事件类型)和data
(推送数据)。
3.3 前端页面示例
创建一个简单的 html 页面,用于接收 sse 数据:
<!doctype html> <html lang="zh-cn"> <head> <meta charset="utf-8"> <title>sse demo</title> </head> <body> <h1>sse 数据推送</h1> <div id="result"></div> <script> const eventsource = new eventsource("http://localhost:8080/sse"); eventsource.onmessage = function(event) { document.getelementbyid("result").innertext += "收到数据: " + event.data + "\n"; }; eventsource.onerror = function() { console.log("sse 连接失败"); eventsource.close(); }; </script> </body> </html>
保存为 sse.html
,放置在 src/main/resources/static
目录下。
3.4 启动与测试
- 启动 spring boot 应用。
- 在浏览器中访问
http://localhost:8080/sse.html
,你将看到页面每秒更新一次计数器数据。 - 或者直接访问
http://localhost:8080/sse
,查看原始事件流:
id: 0 event: message data: 1 id: 1 event: message data: 2
4. 进阶功能(可选)
自定义事件类型
修改控制器支持多种事件:
@getmapping(path = "/sse", produces = mediatype.text_event_stream_value) public flux<serversentevent<string>> streamevents() { return flux.interval(duration.ofseconds(1)) .map(seq -> serversentevent.<string>builder() .id(string.valueof(seq)) .event(seq % 2 == 0 ? "even" : "odd") .data("count: " + counter.incrementandget()) .build()); }
动态推送特定用户
使用 map 存储用户连接:
private final map<string, sinks.many<string>> usersinks = new concurrenthashmap<>(); @getmapping("/sse/{userid}") public flux<serversentevent<string>> userstream(@pathvariable string userid) { sinks.many<string> sink = usersinks.computeifabsent(userid, k -> sinks.many().multicast().onbackpressurebuffer()); return sink.asflux().map(data -> serversentevent.builder().data(data).build()); }
异常处理
添加错误重连逻辑:
return flux.interval(duration.ofseconds(1)) .map(seq -> serversentevent.<integer>builder().data(counter.incrementandget()).build()) .onerrorresume(e -> flux.just(serversentevent.builder().data("error: " + e.getmessage()).build()));
5. 总结
spring boot 3.4.3 结合 spring webflux 实现 sse 功能,为实时数据推送提供了优雅的解决方案。通过本文的步骤,你可以快速搭建一个基于事件驱动的后端服务,满足实时通知或监控等需求。相比 websocket,sse 更轻量且易于集成,适合单向推送场景。
到此这篇关于spring boot 3.4.3 基于 spring webflux 实现 sse 功能的文章就介绍到这了,更多相关spring boot spring webflux sse内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论