spring webflux 是 spring framework 5+ 引入的非阻塞、响应式 web 框架,旨在充分利用现代多核处理器和异步 i/o 模型(如 netty、undertow、servlet 3.1+ 容器),处理海量并发连接,特别适合低延迟、高吞吐量的应用场景。它是 spring mvc 的补充,为响应式编程模型提供了完整的支持。
核心功能:
非阻塞 & 响应式核心:
- 基于 project reactor(reactive streams 规范的实现),使用
flux
(0…n 个元素) 和mono
(0…1 个元素) 作为核心响应式类型。 - 整个请求处理链(controller -> service -> dao)在事件循环线程上运行,避免了为每个请求分配一个线程的传统阻塞模型,极大减少线程上下文切换开销和内存消耗。
- 通过异步 i/o 操作(如网络调用、数据库访问)释放线程,让线程去处理其他请求,提高资源利用率。
- 基于 project reactor(reactive streams 规范的实现),使用
函数式编程模型:
- 基于注解的控制器 (
@controller
,@restcontroller
): 与 spring mvc 风格类似,但方法返回类型是mono
/flux
/mono<void>
等响应式类型。 - 函数式端点 (router functions): 提供轻量级、函数式的编程模型,通过
routerfunction
和handlerfunction
定义路由和处理逻辑,更显式地控制请求处理流程,减少注解开销。
- 基于注解的控制器 (
响应式 http 客户端 (
webclient
):- 非阻塞、响应式的 http 客户端,替代传统的阻塞式
resttemplate
。 - 提供流畅的 api 进行声明式调用,支持异步、流式数据处理和背压。
- 非阻塞、响应式的 http 客户端,替代传统的阻塞式
响应式 websocket 支持:
- 提供处理 websocket 连接的 api,支持双向、全双工的流式通信,返回和消费
flux
/mono
。
- 提供处理 websocket 连接的 api,支持双向、全双工的流式通信,返回和消费
响应式 server-sent events (sse) 支持:
- 轻松实现服务器向客户端单向推送事件流的功能,控制器方法可以直接返回
flux<serversentevent>
。
- 轻松实现服务器向客户端单向推送事件流的功能,控制器方法可以直接返回
与响应式数据存储集成:
- 与响应式数据库驱动(如 r2dbc for sql, reactive mongodb, reactive cassandra, reactive redis)和响应式消息队列(如 reactor kafka, rabbitmq reactor)无缝集成。
背压 (backpressure) 支持:
- 响应式流的核心机制,允许消费者控制生产者的速度,防止下游被上游过快的数据淹没,确保系统稳定。
主要使用场景:
- 高并发 & 高吞吐量应用: 需要处理成千上万甚至百万级并发连接的场景(如实时聊天、游戏服务器、金融交易平台、物联网数据采集网关)。
- 低延迟要求: 需要极快响应时间的应用,非阻塞模型减少了线程阻塞带来的延迟。
- 流式数据处理: 处理持续不断的实时数据流(如传感器数据、日志流、股票行情、视频流处理)。
- 异步 & 非阻塞 i/o 密集型服务: 服务的主要瓶颈在于等待外部资源响应(如微服务间调用、数据库查询、外部 api 调用)。
- 资源受限环境: 需要节省内存和 cpu 资源的场景(如云原生环境、容器化部署),因为 webflux 通常比线程阻塞模型使用更少的线程。
- 构建响应式系统: 作为构建端到端响应式微服务架构的 web 层组件。
何时 不 首选 webflux?
- 简单 crud 应用: 并发量不高,使用熟悉的 spring mvc + 阻塞式数据库驱动开发更快、更简单。
- 强事务性、复杂阻塞逻辑: 如果业务逻辑本身是重度计算密集型(cpu bound)或者必须依赖阻塞式库(如 jdbc)且难以替换,强行用 webflux 可能收益不大甚至引入复杂度。
- 团队缺乏响应式经验: 响应式编程范式(如函数式、声明式、异步流处理)学习曲线较陡峭,调试也更复杂。
完整使用示例 (基于注解的 controller + reactive mongodb + webclient)
1. 依赖 (maven - pom.xml
):
<dependencies> <!-- spring boot starter webflux --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-webflux</artifactid> </dependency> <!-- spring data reactive mongodb --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-mongodb-reactive</artifactid> </dependency> <!-- project reactor (通常由 starter 带进来) --> <dependency> <groupid>io.projectreactor</groupid> <artifactid>reactor-core</artifactid> </dependency> </dependencies>
2. 实体类 (user.java
):
@data @noargsconstructor @allargsconstructor @document(collection = "users") // mongodb 集合映射 public class user { @id private string id; private string name; private string email; }
3. 响应式 repository (userrepository.java
):
public interface userrepository extends reactivemongorepository<user, string> { flux<user> findbyname(string name); // 响应式查询方法 }
4. service 层 (userservice.java
):
@service public class userservice { private final userrepository userrepository; private final webclient webclient; // 响应式 http 客户端 public userservice(userrepository userrepository, webclient.builder webclientbuilder) { this.userrepository = userrepository; this.webclient = webclientbuilder.baseurl("http://some-external-api.com").build(); } public flux<user> getallusers() { return userrepository.findall(); } public mono<user> getuserbyid(string id) { return userrepository.findbyid(id); } public mono<user> createuser(user user) { return userrepository.save(user); } public mono<user> updateuser(string id, user user) { return userrepository.findbyid(id) .flatmap(existinguser -> { existinguser.setname(user.getname()); existinguser.setemail(user.getemail()); return userrepository.save(existinguser); }); } public mono<void> deleteuser(string id) { return userrepository.deletebyid(id); } // 示例:使用 webclient 调用外部api并处理响应 public mono<string> fetchdatafromexternalapi() { return webclient.get() .uri("/data") .retrieve() .bodytomono(string.class) .onerrorresume(e -> mono.just("fallback data")); // 错误处理示例 } }
5. controller 层 (usercontroller.java
):
@restcontroller @requestmapping("/api/users") public class usercontroller { private final userservice userservice; public usercontroller(userservice userservice) { this.userservice = userservice; } // 获取所有用户 (返回 flux<user>) @getmapping public flux<user> getallusers() { return userservice.getallusers(); } // 根据id获取用户 (返回 mono<user>) @getmapping("/{id}") public mono<responseentity<user>> getuserbyid(@pathvariable string id) { return userservice.getuserbyid(id) .map(user -> responseentity.ok(user)) // 找到用户,返回200 ok .defaultifempty(responseentity.notfound().build()); // 用户不存在,返回404 } // 创建用户 (接受 mono<user> 请求体,返回 mono<user>) @postmapping @responsestatus(httpstatus.created) public mono<user> createuser(@requestbody mono<user> usermono) { return usermono.flatmap(userservice::createuser); } // 更新用户 @putmapping("/{id}") public mono<responseentity<user>> updateuser(@pathvariable string id, @requestbody mono<user> usermono) { return usermono.flatmap(user -> userservice.updateuser(id, user)) .map(updateduser -> responseentity.ok(updateduser)) .defaultifempty(responseentity.notfound().build()); } // 删除用户 (返回 mono<void> 表示操作完成) @deletemapping("/{id}") @responsestatus(httpstatus.no_content) public mono<void> deleteuser(@pathvariable string id) { return userservice.deleteuser(id); } // 示例:server-sent events (sse) 端点 - 模拟持续发送用户列表更新 @getmapping(value = "/stream", produces = mediatype.text_event_stream_value) public flux<user> streamuserupdates() { return flux.interval(duration.ofseconds(1)) // 每秒触发一次 .flatmap(tick -> userservice.getallusers().take(5)); // 每次发送前5个用户(模拟更新) } // 示例:调用外部api @getmapping("/external-data") public mono<string> getexternaldata() { return userservice.fetchdatafromexternalapi(); } }
6. 配置 (application.properties
or application.yml
):
# mongodb 连接配置 spring.data.mongodb.uri=mongodb://localhost:27017/mydatabase # (可选) 设置 netty 作为内嵌服务器 (默认通常是 netty) server.port=8080
示例说明:
- 依赖: 引入了 webflux、响应式 mongodb 和 reactor 的核心依赖。
- 实体 & repository: 定义了
user
实体和响应式的 spring data mongodb repository 接口。 - service:
- 注入
userrepository
进行响应式数据库操作 (findall
,findbyid
,save
,deletebyid
)。 - 注入
webclient
(通过webclient.builder
构造) 用于进行非阻塞的 http 调用。fetchdatafromexternalapi
方法展示了基本用法和错误处理 (onerrorresume
)。 - 所有 service 方法返回
flux
或mono
。
- 注入
- controller:
- 使用
@restcontroller
和@requestmapping
定义 rest 端点。 - 方法参数可以接受
@requestbody mono<user>
表示请求体是异步到达的。 - 方法返回类型都是
flux
或mono
。 getuserbyid
和updateuser
展示了如何处理可能为空的结果,返回不同的responseentity
状态码。deleteuser
返回mono<void>
并设置@responsestatus(httpstatus.no_content)
。streamuserupdates
方法:- 设置
produces = mediatype.text_event_stream_value
表明这是一个 sse 端点。 - 使用
flux.interval
每秒生成一个信号。 - 每次信号触发时,调用
userservice.getallusers().take(5)
获取(模拟的)最新用户数据(这里简单取了前5个)并发送给客户端。客户端会持续接收到事件流。
- 设置
getexternaldata
方法展示了如何在 controller 中调用 service 层的 webclient 功能。
- 使用
- 运行: 启动 spring boot 应用。应用默认会使用 netty 作为内嵌服务器。可以使用
curl
, postman 或浏览器(对于/api/users/stream
)测试各个端点。
关键注意事项:
- 非阻塞贯穿始终: 从 controller 到 service 到 repository (或 webclient) 的整个调用链必须是响应式和非阻塞的。如果在其中任何一环使用了阻塞操作(如传统的 jdbc 调用、
thread.sleep()
、同步 io),会破坏响应式模型的优势,甚至可能导致性能下降或线程饥饿。 - 学习曲线: 响应式编程范式(
flux
,mono
, 操作符如map
,flatmap
,filter
,zip
,onerrorresume
等)需要学习和适应。 - 调试: 调试响应式流比调试传统的命令式代码更具挑战性,堆栈跟踪可能不那么直观。利用 reactor 的调试工具(如
hooks.onoperatordebug()
)和日志记录很重要。 - 现有库兼容性: 确保你使用的所有库(数据库驱动、http 客户端、消息队列客户端等)都有响应式版本或支持非阻塞操作。使用阻塞库会破坏响应式链。
- 背压理解: 理解背压机制以及如何在你的应用中正确处理它是构建健壮响应式系统的关键。
总结:
spring webflux 是一个强大的框架,为构建高性能、可扩展、资源高效的异步和非阻塞 web 应用程序和微服务提供了现代解决方案。它特别适合处理高并发、低延迟和流式数据场景。在决定采用 webflux 时,务必权衡其优势(性能、可伸缩性)与挑战(学习曲线、调试、库兼容性),确保它适合你的具体应用需求和团队技能。上面的完整示例展示了如何使用注解模型结合响应式 mongodb 和 webclient 构建一个基本的 crud api,并包含 sse 和外部 api 调用的示例。
到此这篇关于spring webflux 功能介绍,使用场景,完整使用示例的文章就介绍到这了,更多相关spring webflux 使用内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论