spring webflux 是 spring framework 5 引入的响应式 web 框架,基于 project reactor 实现,支持非阻塞、函数式编程模型。本节将深入探讨 webflux 的核心功能,包括 rest api 构建、响应式数据库访问和实时通信。
4.3.1 构建 reactive rest api
基础项目搭建
首先创建 spring boot webflux 项目(基于 spring initializr):
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-webflux</artifactid> </dependency>
响应式控制器
注解式控制器(与传统spring mvc类似但支持响应式类型):
@restcontroller @requestmapping("/products") public class productcontroller { private final productservice productservice; // 构造函数注入 public productcontroller(productservice productservice) { this.productservice = productservice; } @getmapping public flux<product> getallproducts() { return productservice.findall(); } @getmapping("/{id}") public mono<product> getproductbyid(@pathvariable string id) { return productservice.findbyid(id); } @postmapping @responsestatus(httpstatus.created) public mono<product> createproduct(@requestbody mono<product> productmono) { return productservice.save(productmono); } @putmapping("/{id}") public mono<product> updateproduct( @pathvariable string id, @requestbody mono<product> productmono) { return productservice.update(id, productmono); } @deletemapping("/{id}") @responsestatus(httpstatus.no_content) public mono<void> deleteproduct(@pathvariable string id) { return productservice.delete(id); } }
函数式端点(routerfunction方式):
@configuration public class productrouter { @bean public routerfunction<serverresponse> route(producthandler handler) { return routerfunctions.route() .get("/fn/products", handler::getall) .get("/fn/products/{id}", handler::getbyid) .post("/fn/products", handler::create) .put("/fn/products/{id}", handler::update) .delete("/fn/products/{id}", handler::delete) .build(); } } @component public class producthandler { private final productservice productservice; public producthandler(productservice productservice) { this.productservice = productservice; } public mono<serverresponse> getall(serverrequest request) { return serverresponse.ok() .contenttype(mediatype.application_ndjson) .body(productservice.findall(), product.class); } public mono<serverresponse> getbyid(serverrequest request) { string id = request.pathvariable("id"); return productservice.findbyid(id) .flatmap(product -> serverresponse.ok().bodyvalue(product)) .switchifempty(serverresponse.notfound().build()); } public mono<serverresponse> create(serverrequest request) { return request.bodytomono(product.class) .flatmap(productservice::save) .flatmap(product -> serverresponse .created(uri.create("/fn/products/" + product.getid())) .bodyvalue(product)); } public mono<serverresponse> update(serverrequest request) { string id = request.pathvariable("id"); return request.bodytomono(product.class) .flatmap(product -> productservice.update(id, mono.just(product))) .flatmap(product -> serverresponse.ok().bodyvalue(product)) .switchifempty(serverresponse.notfound().build()); } public mono<serverresponse> delete(serverrequest request) { string id = request.pathvariable("id"); return productservice.delete(id) .then(serverresponse.nocontent().build()); } }
高级特性
流式响应(server-sent events):
@getmapping(value = "/stream", produces = mediatype.text_event_stream_value) public flux<productevent> streamproducts() { return flux.interval(duration.ofseconds(1)) .map(sequence -> new productevent( "product-" + sequence, "event at " + instant.now() )); }
请求验证与异常处理:
@controlleradvice public class globalerrorhandler extends abstracterrorwebexceptionhandler { public globalerrorhandler(errorattributes errorattributes, webproperties.resources resources, applicationcontext applicationcontext, servercodecconfigurer servercodecconfigurer) { super(errorattributes, resources, applicationcontext); this.setmessagewriters(servercodecconfigurer.getwriters()); } @override protected routerfunction<serverresponse> getroutingfunction(errorattributes errorattributes) { return routerfunctions.route( requestpredicates.all(), request -> { map<string, object> errorproperties = geterrorattributes(request, errorattributeoptions.defaults()); httpstatus status = gethttpstatus(errorproperties); return serverresponse.status(status) .contenttype(mediatype.application_json) .bodyvalue(errorproperties); } ); } private httpstatus gethttpstatus(map<string, object> errorproperties) { return httpstatus.valueof((integer)errorproperties.get("status")); } } // 自定义验证 public class productvalidator { public static mono<product> validate(product product) { return mono.just(product) .flatmap(p -> { list<string> errors = new arraylist<>(); if (p.getname() == null || p.getname().isempty()) { errors.add("product name is required"); } if (p.getprice() <= 0) { errors.add("price must be positive"); } if (!errors.isempty()) { return mono.error(new validationexception(errors)); } return mono.just(p); }); } }
4.3.2 响应式数据库访问(r2dbc)
r2dbc 配置
添加依赖:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-r2dbc</artifactid> </dependency> <dependency> <groupid>io.r2dbc</groupid> <artifactid>r2dbc-postgresql</artifactid> <scope>runtime</scope> </dependency>
配置 application.yml:
spring: r2dbc: url: r2dbc:postgresql://localhost:5432/mydb username: user password: pass pool: enabled: true max-size: 20
响应式repository
定义实体:
@data @table("products") public class product { @id private long id; private string name; private string description; private bigdecimal price; private instant createdat; }
创建repository接口:
public interface productrepository extends reactivecrudrepository<product, long> { flux<product> findbynamecontaining(string name); @query("select * from products where price > :minprice") flux<product> findbypricegreaterthan(bigdecimal minprice); @modifying @query("update products set price = price * :factor") mono<integer> updateallprices(bigdecimal factor); }
复杂查询与事务
自定义查询实现:
public class productrepositoryimpl implements customproductrepository { private final databaseclient databaseclient; public productrepositoryimpl(databaseclient databaseclient) { this.databaseclient = databaseclient; } @override public flux<product> complexsearch(productcriteria criteria) { return databaseclient.sql(""" select * from products where name like :name and price between :minprice and :maxprice order by :sortfield :sortdirection limit :limit offset :offset """) .bind("name", "%" + criteria.getname() + "%") .bind("minprice", criteria.getminprice()) .bind("maxprice", criteria.getmaxprice()) .bind("sortfield", criteria.getsortfield()) .bind("sortdirection", criteria.getsortdirection()) .bind("limit", criteria.getpagesize()) .bind("offset", (criteria.getpagenumber() - 1) * criteria.getpagesize()) .map((row, metadata) -> toproduct(row)) .all(); } private product toproduct(row row) { // 行到对象的转换逻辑 } }
事务管理:
@service @requiredargsconstructor public class productservice { private final productrepository productrepository; private final transactionaloperator transactionaloperator; public mono<void> transferstock(string fromid, string toid, int quantity) { return transactionaloperator.execute(status -> productrepository.findbyid(fromid) .flatmap(fromproduct -> { if (fromproduct.getstock() < quantity) { return mono.error(new insufficientstockexception()); } fromproduct.setstock(fromproduct.getstock() - quantity); return productrepository.save(fromproduct) .then(productrepository.findbyid(toid)) .flatmap(toproduct -> { toproduct.setstock(toproduct.getstock() + quantity); return productrepository.save(toproduct); }); }) ); } }
性能优化
连接池配置:
spring: r2dbc: pool: max-size: 20 initial-size: 5 max-idle-time: 30m
批处理操作:
public mono<integer> batchinsert(list<product> products) { return databaseclient.inconnectionmany(connection -> { batch batch = connection.createbatch(); products.foreach(product -> batch.add("insert into products(name, price) values($1, $2)") .bind(0, product.getname()) .bind(1, product.getprice()) ); return flux.from(batch.execute()) .reduce(0, (count, result) -> count + result.getrowsupdated()); }); }
4.3.3 websocket 实时通信
基础websocket配置
配置类:
@configuration @enablewebflux public class websocketconfig implements websocketmessagebrokerconfigurer { @override public void registerstompendpoints(stompendpointregistry registry) { registry.addendpoint("/ws") .sethandshakehandler(new defaulthandshakehandler()) .setallowedorigins("*"); } @override public void configuremessagebroker(messagebrokerregistry registry) { registry.enablesimplebroker("/topic"); registry.setapplicationdestinationprefixes("/app"); } }
响应式websocket处理
股票行情推送示例:
@controller public class stocktickercontroller { private final flux<stockquote> stockquoteflux; public stocktickercontroller(stockquotegenerator quotegenerator) { this.stockquoteflux = flux.interval(duration.ofmillis(500)) .map(sequence -> quotegenerator.generate()) .share(); // 热发布,多个订阅者共享数据 } @messagemapping("stocks.subscribe") @sendto("/topic/stocks") public flux<stockquote> subscribe() { return stockquoteflux; } @messagemapping("stocks.filter") public flux<stockquote> filter(@payload string symbol) { return stockquoteflux.filter(quote -> quote.getsymbol().equals(symbol)); } }
客户端连接示例:
const socket = new sockjs('/ws'); const stompclient = stomp.over(socket); stompclient.connect({}, () => { stompclient.subscribe('/topic/stocks', (message) => { const quote = json.parse(message.body); updatestocktable(quote); }); stompclient.send("/app/stocks.filter", {}, "aapl"); });
高级特性
rsocket集成(更强大的响应式协议):
@controller @messagemapping("stock.service") public class rsocketstockcontroller { @messagemapping("current") public mono<stockquote> current(string symbol) { return stockservice.getcurrent(symbol); } @messagemapping("stream") public flux<stockquote> stream(string symbol) { return stockservice.getstream(symbol); } @messagemapping("channel") public flux<stockquote> channel(flux<string> symbols) { return symbols.flatmap(stockservice::getstream); } }
背压控制:
@messagemapping("large.data.stream") public flux<datachunk> largedatastream() { return dataservice.streamlargedata() .onbackpressurebuffer(50, // 缓冲区大小 chunk -> log.warn("dropping chunk due to backpressure")); }
安全配置
@configuration @enablewebfluxsecurity public class securityconfig { @bean public securitywebfilterchain securityfilterchain(serverhttpsecurity http) { return http .authorizeexchange() .pathmatchers("/ws/**").authenticated() .anyexchange().permitall() .and() .httpbasic() .and() .csrf().disable() .build(); } @bean public mapreactiveuserdetailsservice userdetailsservice() { userdetails user = user.withusername("user") .password("{noop}password") .roles("user") .build(); return new mapreactiveuserdetailsservice(user); } }
性能监控与最佳实践
监控端点配置
management: endpoints: web: exposure: include: health, metrics, prometheus metrics: tags: application: ${spring.application.name}
响应式应用监控
@bean public meterregistrycustomizer<meterregistry> metricscommontags() { return registry -> registry.config() .commontags("application", "reactive-demo"); } // 自定义指标 @bean public webfilter metricswebfilter(meterregistry registry) { return (exchange, chain) -> { string path = exchange.getrequest().getpath().tostring(); timer.sample sample = timer.start(registry); return chain.filter(exchange) .doonsuccessorerror((done, ex) -> { sample.stop(registry.timer("http.requests", "uri", path, "status", exchange.getresponse().getstatuscode().tostring(), "method", exchange.getrequest().getmethodvalue())); }); }; }
最佳实践总结
线程模型理解:
- webflux 默认使用 netty 事件循环线程
- 阻塞操作必须使用
publishon
切换到弹性线程池
背压策略选择:
- ui 客户端:使用
onbackpressuredrop
或onbackpressurelatest
- 服务间通信:使用
onbackpressurebuffer
配合合理缓冲区大小
- ui 客户端:使用
错误处理原则:
- 尽早处理错误
- 为每个 flux/mono 链添加错误处理
- 区分业务异常和系统异常
测试策略:
- 使用
stepverifier
测试响应式流 - 使用
webtestclient
测试控制器 - 虚拟时间测试长时间操作
- 使用
性能调优:
- 合理配置连接池
- 监控关键指标(延迟、吞吐量、资源使用率)
- 使用响应式日志框架(如 logback 异步appender)
通过以上全面实践,您将能够构建高性能、可扩展的响应式 web 应用,充分利用 webflux 的非阻塞特性,处理高并发场景下的各种挑战。
到此这篇关于spring webflux 深度实践指南的文章就介绍到这了,更多相关spring webflux内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论