

spring webflux 是 spring framework 5 引入的响应式 web 框架,基于 project reactor 实现,支持非阻塞、函数式编程模型。本节将深入探讨 webflux 的核心功能,包括 rest api 构建、响应式数据库访问和实时通信。
4.3.1 构建 reactive rest api

基础项目搭建
首先创建 spring boot webflux 项目(基于 spring initializr):
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-webflux</artifactid>
</dependency>响应式控制器
注解式控制器(与传统spring mvc类似但支持响应式类型):
@restcontroller
@requestmapping("/products")
public class productcontroller {
private final productservice productservice;
// 构造函数注入
public productcontroller(productservice productservice) {
this.productservice = productservice;
}
@getmapping
public flux<product> getallproducts() {
return productservice.findall();
}
@getmapping("/{id}")
public mono<product> getproductbyid(@pathvariable string id) {
return productservice.findbyid(id);
}
@postmapping
@responsestatus(httpstatus.created)
public mono<product> createproduct(@requestbody mono<product> productmono) {
return productservice.save(productmono);
}
@putmapping("/{id}")
public mono<product> updateproduct(
@pathvariable string id,
@requestbody mono<product> productmono) {
return productservice.update(id, productmono);
}
@deletemapping("/{id}")
@responsestatus(httpstatus.no_content)
public mono<void> deleteproduct(@pathvariable string id) {
return productservice.delete(id);
}
}函数式端点(routerfunction方式):
@configuration
public class productrouter {
@bean
public routerfunction<serverresponse> route(producthandler handler) {
return routerfunctions.route()
.get("/fn/products", handler::getall)
.get("/fn/products/{id}", handler::getbyid)
.post("/fn/products", handler::create)
.put("/fn/products/{id}", handler::update)
.delete("/fn/products/{id}", handler::delete)
.build();
}
}
@component
public class producthandler {
private final productservice productservice;
public producthandler(productservice productservice) {
this.productservice = productservice;
}
public mono<serverresponse> getall(serverrequest request) {
return serverresponse.ok()
.contenttype(mediatype.application_ndjson)
.body(productservice.findall(), product.class);
}
public mono<serverresponse> getbyid(serverrequest request) {
string id = request.pathvariable("id");
return productservice.findbyid(id)
.flatmap(product -> serverresponse.ok().bodyvalue(product))
.switchifempty(serverresponse.notfound().build());
}
public mono<serverresponse> create(serverrequest request) {
return request.bodytomono(product.class)
.flatmap(productservice::save)
.flatmap(product -> serverresponse
.created(uri.create("/fn/products/" + product.getid()))
.bodyvalue(product));
}
public mono<serverresponse> update(serverrequest request) {
string id = request.pathvariable("id");
return request.bodytomono(product.class)
.flatmap(product -> productservice.update(id, mono.just(product)))
.flatmap(product -> serverresponse.ok().bodyvalue(product))
.switchifempty(serverresponse.notfound().build());
}
public mono<serverresponse> delete(serverrequest request) {
string id = request.pathvariable("id");
return productservice.delete(id)
.then(serverresponse.nocontent().build());
}
}高级特性

流式响应(server-sent events):
@getmapping(value = "/stream", produces = mediatype.text_event_stream_value)
public flux<productevent> streamproducts() {
return flux.interval(duration.ofseconds(1))
.map(sequence -> new productevent(
"product-" + sequence,
"event at " + instant.now()
));
}请求验证与异常处理:
@controlleradvice
public class globalerrorhandler extends abstracterrorwebexceptionhandler {
public globalerrorhandler(errorattributes errorattributes,
webproperties.resources resources,
applicationcontext applicationcontext,
servercodecconfigurer servercodecconfigurer) {
super(errorattributes, resources, applicationcontext);
this.setmessagewriters(servercodecconfigurer.getwriters());
}
@override
protected routerfunction<serverresponse> getroutingfunction(errorattributes errorattributes) {
return routerfunctions.route(
requestpredicates.all(),
request -> {
map<string, object> errorproperties = geterrorattributes(request, errorattributeoptions.defaults());
httpstatus status = gethttpstatus(errorproperties);
return serverresponse.status(status)
.contenttype(mediatype.application_json)
.bodyvalue(errorproperties);
}
);
}
private httpstatus gethttpstatus(map<string, object> errorproperties) {
return httpstatus.valueof((integer)errorproperties.get("status"));
}
}
// 自定义验证
public class productvalidator {
public static mono<product> validate(product product) {
return mono.just(product)
.flatmap(p -> {
list<string> errors = new arraylist<>();
if (p.getname() == null || p.getname().isempty()) {
errors.add("product name is required");
}
if (p.getprice() <= 0) {
errors.add("price must be positive");
}
if (!errors.isempty()) {
return mono.error(new validationexception(errors));
}
return mono.just(p);
});
}
}4.3.2 响应式数据库访问(r2dbc)

r2dbc 配置
添加依赖:
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-r2dbc</artifactid>
</dependency>
<dependency>
<groupid>io.r2dbc</groupid>
<artifactid>r2dbc-postgresql</artifactid>
<scope>runtime</scope>
</dependency>配置 application.yml:
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: user
password: pass
pool:
enabled: true
max-size: 20响应式repository
定义实体:
@data
@table("products")
public class product {
@id
private long id;
private string name;
private string description;
private bigdecimal price;
private instant createdat;
}创建repository接口:
public interface productrepository extends reactivecrudrepository<product, long> {
flux<product> findbynamecontaining(string name);
@query("select * from products where price > :minprice")
flux<product> findbypricegreaterthan(bigdecimal minprice);
@modifying
@query("update products set price = price * :factor")
mono<integer> updateallprices(bigdecimal factor);
}复杂查询与事务

自定义查询实现:
public class productrepositoryimpl implements customproductrepository {
private final databaseclient databaseclient;
public productrepositoryimpl(databaseclient databaseclient) {
this.databaseclient = databaseclient;
}
@override
public flux<product> complexsearch(productcriteria criteria) {
return databaseclient.sql("""
select * from products
where name like :name
and price between :minprice and :maxprice
order by :sortfield :sortdirection
limit :limit offset :offset
""")
.bind("name", "%" + criteria.getname() + "%")
.bind("minprice", criteria.getminprice())
.bind("maxprice", criteria.getmaxprice())
.bind("sortfield", criteria.getsortfield())
.bind("sortdirection", criteria.getsortdirection())
.bind("limit", criteria.getpagesize())
.bind("offset", (criteria.getpagenumber() - 1) * criteria.getpagesize())
.map((row, metadata) -> toproduct(row))
.all();
}
private product toproduct(row row) {
// 行到对象的转换逻辑
}
}事务管理:
@service
@requiredargsconstructor
public class productservice {
private final productrepository productrepository;
private final transactionaloperator transactionaloperator;
public mono<void> transferstock(string fromid, string toid, int quantity) {
return transactionaloperator.execute(status ->
productrepository.findbyid(fromid)
.flatmap(fromproduct -> {
if (fromproduct.getstock() < quantity) {
return mono.error(new insufficientstockexception());
}
fromproduct.setstock(fromproduct.getstock() - quantity);
return productrepository.save(fromproduct)
.then(productrepository.findbyid(toid))
.flatmap(toproduct -> {
toproduct.setstock(toproduct.getstock() + quantity);
return productrepository.save(toproduct);
});
})
);
}
}性能优化
连接池配置:
spring:
r2dbc:
pool:
max-size: 20
initial-size: 5
max-idle-time: 30m批处理操作:
public mono<integer> batchinsert(list<product> products) {
return databaseclient.inconnectionmany(connection -> {
batch batch = connection.createbatch();
products.foreach(product ->
batch.add("insert into products(name, price) values($1, $2)")
.bind(0, product.getname())
.bind(1, product.getprice())
);
return flux.from(batch.execute())
.reduce(0, (count, result) -> count + result.getrowsupdated());
});
}4.3.3 websocket 实时通信

基础websocket配置
配置类:
@configuration
@enablewebflux
public class websocketconfig implements websocketmessagebrokerconfigurer {
@override
public void registerstompendpoints(stompendpointregistry registry) {
registry.addendpoint("/ws")
.sethandshakehandler(new defaulthandshakehandler())
.setallowedorigins("*");
}
@override
public void configuremessagebroker(messagebrokerregistry registry) {
registry.enablesimplebroker("/topic");
registry.setapplicationdestinationprefixes("/app");
}
}响应式websocket处理
股票行情推送示例:
@controller
public class stocktickercontroller {
private final flux<stockquote> stockquoteflux;
public stocktickercontroller(stockquotegenerator quotegenerator) {
this.stockquoteflux = flux.interval(duration.ofmillis(500))
.map(sequence -> quotegenerator.generate())
.share(); // 热发布,多个订阅者共享数据
}
@messagemapping("stocks.subscribe")
@sendto("/topic/stocks")
public flux<stockquote> subscribe() {
return stockquoteflux;
}
@messagemapping("stocks.filter")
public flux<stockquote> filter(@payload string symbol) {
return stockquoteflux.filter(quote ->
quote.getsymbol().equals(symbol));
}
}客户端连接示例:
const socket = new sockjs('/ws');
const stompclient = stomp.over(socket);
stompclient.connect({}, () => {
stompclient.subscribe('/topic/stocks', (message) => {
const quote = json.parse(message.body);
updatestocktable(quote);
});
stompclient.send("/app/stocks.filter", {}, "aapl");
});高级特性

rsocket集成(更强大的响应式协议):
@controller
@messagemapping("stock.service")
public class rsocketstockcontroller {
@messagemapping("current")
public mono<stockquote> current(string symbol) {
return stockservice.getcurrent(symbol);
}
@messagemapping("stream")
public flux<stockquote> stream(string symbol) {
return stockservice.getstream(symbol);
}
@messagemapping("channel")
public flux<stockquote> channel(flux<string> symbols) {
return symbols.flatmap(stockservice::getstream);
}
}背压控制:
@messagemapping("large.data.stream")
public flux<datachunk> largedatastream() {
return dataservice.streamlargedata()
.onbackpressurebuffer(50, // 缓冲区大小
chunk -> log.warn("dropping chunk due to backpressure"));
}
安全配置
@configuration
@enablewebfluxsecurity
public class securityconfig {
@bean
public securitywebfilterchain securityfilterchain(serverhttpsecurity http) {
return http
.authorizeexchange()
.pathmatchers("/ws/**").authenticated()
.anyexchange().permitall()
.and()
.httpbasic()
.and()
.csrf().disable()
.build();
}
@bean
public mapreactiveuserdetailsservice userdetailsservice() {
userdetails user = user.withusername("user")
.password("{noop}password")
.roles("user")
.build();
return new mapreactiveuserdetailsservice(user);
}
}性能监控与最佳实践

监控端点配置
management:
endpoints:
web:
exposure:
include: health, metrics, prometheus
metrics:
tags:
application: ${spring.application.name}响应式应用监控
@bean
public meterregistrycustomizer<meterregistry> metricscommontags() {
return registry -> registry.config()
.commontags("application", "reactive-demo");
}
// 自定义指标
@bean
public webfilter metricswebfilter(meterregistry registry) {
return (exchange, chain) -> {
string path = exchange.getrequest().getpath().tostring();
timer.sample sample = timer.start(registry);
return chain.filter(exchange)
.doonsuccessorerror((done, ex) -> {
sample.stop(registry.timer("http.requests",
"uri", path,
"status", exchange.getresponse().getstatuscode().tostring(),
"method", exchange.getrequest().getmethodvalue()));
});
};
}最佳实践总结
线程模型理解:
- webflux 默认使用 netty 事件循环线程
- 阻塞操作必须使用
publishon切换到弹性线程池
背压策略选择:
- ui 客户端:使用
onbackpressuredrop或onbackpressurelatest - 服务间通信:使用
onbackpressurebuffer配合合理缓冲区大小
- ui 客户端:使用
错误处理原则:
- 尽早处理错误
- 为每个 flux/mono 链添加错误处理
- 区分业务异常和系统异常
测试策略:
- 使用
stepverifier测试响应式流 - 使用
webtestclient测试控制器 - 虚拟时间测试长时间操作
- 使用
性能调优:
- 合理配置连接池
- 监控关键指标(延迟、吞吐量、资源使用率)
- 使用响应式日志框架(如 logback 异步appender)
通过以上全面实践,您将能够构建高性能、可扩展的响应式 web 应用,充分利用 webflux 的非阻塞特性,处理高并发场景下的各种挑战。
到此这篇关于spring webflux 深度实践指南的文章就介绍到这了,更多相关spring webflux内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论