当前位置: 代码网 > it编程>编程语言>Java > Spring WebFlux 核心作用

Spring WebFlux 核心作用

2025年08月11日 Java 我要评论
spring webflux 是 spring framework 5 引入的响应式 web 框架,基于 project reactor 实现,支持非阻塞、函数式编程模型。本节将深入探讨 webflu

spring webflux 是 spring framework 5 引入的响应式 web 框架,基于 project reactor 实现,支持非阻塞、函数式编程模型。本节将深入探讨 webflux 的核心功能,包括 rest api 构建、响应式数据库访问和实时通信。

4.3.1 构建 reactive rest api

基础项目搭建

首先创建 spring boot webflux 项目(基于 spring initializr):

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-webflux</artifactid>
</dependency>

响应式控制器

注解式控制器(与传统spring mvc类似但支持响应式类型):

@restcontroller
@requestmapping("/products")
public class productcontroller {
    private final productservice productservice;
    // 构造函数注入
    public productcontroller(productservice productservice) {
        this.productservice = productservice;
    }
    @getmapping
    public flux<product> getallproducts() {
        return productservice.findall();
    }
    @getmapping("/{id}")
    public mono<product> getproductbyid(@pathvariable string id) {
        return productservice.findbyid(id);
    }
    @postmapping
    @responsestatus(httpstatus.created)
    public mono<product> createproduct(@requestbody mono<product> productmono) {
        return productservice.save(productmono);
    }
    @putmapping("/{id}")
    public mono<product> updateproduct(
            @pathvariable string id, 
            @requestbody mono<product> productmono) {
        return productservice.update(id, productmono);
    }
    @deletemapping("/{id}")
    @responsestatus(httpstatus.no_content)
    public mono<void> deleteproduct(@pathvariable string id) {
        return productservice.delete(id);
    }
}

函数式端点(routerfunction方式):

@configuration
public class productrouter {
    @bean
    public routerfunction<serverresponse> route(producthandler handler) {
        return routerfunctions.route()
            .get("/fn/products", handler::getall)
            .get("/fn/products/{id}", handler::getbyid)
            .post("/fn/products", handler::create)
            .put("/fn/products/{id}", handler::update)
            .delete("/fn/products/{id}", handler::delete)
            .build();
    }
}
@component
public class producthandler {
    private final productservice productservice;
    public producthandler(productservice productservice) {
        this.productservice = productservice;
    }
    public mono<serverresponse> getall(serverrequest request) {
        return serverresponse.ok()
            .contenttype(mediatype.application_ndjson)
            .body(productservice.findall(), product.class);
    }
    public mono<serverresponse> getbyid(serverrequest request) {
        string id = request.pathvariable("id");
        return productservice.findbyid(id)
            .flatmap(product -> serverresponse.ok().bodyvalue(product))
            .switchifempty(serverresponse.notfound().build());
    }
    public mono<serverresponse> create(serverrequest request) {
        return request.bodytomono(product.class)
            .flatmap(productservice::save)
            .flatmap(product -> serverresponse
                .created(uri.create("/fn/products/" + product.getid()))
                .bodyvalue(product));
    }
    public mono<serverresponse> update(serverrequest request) {
        string id = request.pathvariable("id");
        return request.bodytomono(product.class)
            .flatmap(product -> productservice.update(id, mono.just(product)))
            .flatmap(product -> serverresponse.ok().bodyvalue(product))
            .switchifempty(serverresponse.notfound().build());
    }
    public mono<serverresponse> delete(serverrequest request) {
        string id = request.pathvariable("id");
        return productservice.delete(id)
            .then(serverresponse.nocontent().build());
    }
}

高级特性

流式响应(server-sent events):

@getmapping(value = "/stream", produces = mediatype.text_event_stream_value)
public flux<productevent> streamproducts() {
    return flux.interval(duration.ofseconds(1))
        .map(sequence -> new productevent(
            "product-" + sequence, 
            "event at " + instant.now()
        ));
}

请求验证与异常处理

@controlleradvice
public class globalerrorhandler extends abstracterrorwebexceptionhandler {
    public globalerrorhandler(errorattributes errorattributes, 
                            webproperties.resources resources,
                            applicationcontext applicationcontext,
                            servercodecconfigurer servercodecconfigurer) {
        super(errorattributes, resources, applicationcontext);
        this.setmessagewriters(servercodecconfigurer.getwriters());
    }
    @override
    protected routerfunction<serverresponse> getroutingfunction(errorattributes errorattributes) {
        return routerfunctions.route(
            requestpredicates.all(), 
            request -> {
                map<string, object> errorproperties = geterrorattributes(request, errorattributeoptions.defaults());
                httpstatus status = gethttpstatus(errorproperties);
                return serverresponse.status(status)
                    .contenttype(mediatype.application_json)
                    .bodyvalue(errorproperties);
            }
        );
    }
    private httpstatus gethttpstatus(map<string, object> errorproperties) {
        return httpstatus.valueof((integer)errorproperties.get("status"));
    }
}
// 自定义验证
public class productvalidator {
    public static mono<product> validate(product product) {
        return mono.just(product)
            .flatmap(p -> {
                list<string> errors = new arraylist<>();
                if (p.getname() == null || p.getname().isempty()) {
                    errors.add("product name is required");
                }
                if (p.getprice() <= 0) {
                    errors.add("price must be positive");
                }
                if (!errors.isempty()) {
                    return mono.error(new validationexception(errors));
                }
                return mono.just(p);
            });
    }
}

4.3.2 响应式数据库访问(r2dbc)

r2dbc 配置

添加依赖:

<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-data-r2dbc</artifactid>
</dependency>
<dependency>
    <groupid>io.r2dbc</groupid>
    <artifactid>r2dbc-postgresql</artifactid>
    <scope>runtime</scope>
</dependency>

配置 application.yml:

spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: user
    password: pass
    pool:
      enabled: true
      max-size: 20

响应式repository

定义实体:

@data
@table("products")
public class product {
    @id
    private long id;
    private string name;
    private string description;
    private bigdecimal price;
    private instant createdat;
}

创建repository接口:

public interface productrepository extends reactivecrudrepository<product, long> {
    flux<product> findbynamecontaining(string name);
    @query("select * from products where price > :minprice")
    flux<product> findbypricegreaterthan(bigdecimal minprice);
    @modifying
    @query("update products set price = price * :factor")
    mono<integer> updateallprices(bigdecimal factor);
}

复杂查询与事务

自定义查询实现

public class productrepositoryimpl implements customproductrepository {
    private final databaseclient databaseclient;
    public productrepositoryimpl(databaseclient databaseclient) {
        this.databaseclient = databaseclient;
    }
    @override
    public flux<product> complexsearch(productcriteria criteria) {
        return databaseclient.sql("""
                select * from products 
                where name like :name 
                and price between :minprice and :maxprice
                order by :sortfield :sortdirection
                limit :limit offset :offset
                """)
            .bind("name", "%" + criteria.getname() + "%")
            .bind("minprice", criteria.getminprice())
            .bind("maxprice", criteria.getmaxprice())
            .bind("sortfield", criteria.getsortfield())
            .bind("sortdirection", criteria.getsortdirection())
            .bind("limit", criteria.getpagesize())
            .bind("offset", (criteria.getpagenumber() - 1) * criteria.getpagesize())
            .map((row, metadata) -> toproduct(row))
            .all();
    }
    private product toproduct(row row) {
        // 行到对象的转换逻辑
    }
}

事务管理

@service
@requiredargsconstructor
public class productservice {
    private final productrepository productrepository;
    private final transactionaloperator transactionaloperator;
    public mono<void> transferstock(string fromid, string toid, int quantity) {
        return transactionaloperator.execute(status -> 
            productrepository.findbyid(fromid)
                .flatmap(fromproduct -> {
                    if (fromproduct.getstock() < quantity) {
                        return mono.error(new insufficientstockexception());
                    }
                    fromproduct.setstock(fromproduct.getstock() - quantity);
                    return productrepository.save(fromproduct)
                        .then(productrepository.findbyid(toid))
                        .flatmap(toproduct -> {
                            toproduct.setstock(toproduct.getstock() + quantity);
                            return productrepository.save(toproduct);
                        });
                })
        );
    }
}

性能优化

连接池配置

spring:
  r2dbc:
    pool:
      max-size: 20
      initial-size: 5
      max-idle-time: 30m

批处理操作

public mono<integer> batchinsert(list<product> products) {
    return databaseclient.inconnectionmany(connection -> {
        batch batch = connection.createbatch();
        products.foreach(product -> 
            batch.add("insert into products(name, price) values($1, $2)")
                .bind(0, product.getname())
                .bind(1, product.getprice())
        );
        return flux.from(batch.execute())
            .reduce(0, (count, result) -> count + result.getrowsupdated());
    });
}

4.3.3 websocket 实时通信

基础websocket配置

配置类:

@configuration
@enablewebflux
public class websocketconfig implements websocketmessagebrokerconfigurer {
    @override
    public void registerstompendpoints(stompendpointregistry registry) {
        registry.addendpoint("/ws")
            .sethandshakehandler(new defaulthandshakehandler())
            .setallowedorigins("*");
    }
    @override
    public void configuremessagebroker(messagebrokerregistry registry) {
        registry.enablesimplebroker("/topic");
        registry.setapplicationdestinationprefixes("/app");
    }
}

响应式websocket处理

股票行情推送示例

@controller
public class stocktickercontroller {
    private final flux<stockquote> stockquoteflux;
    public stocktickercontroller(stockquotegenerator quotegenerator) {
        this.stockquoteflux = flux.interval(duration.ofmillis(500))
            .map(sequence -> quotegenerator.generate())
            .share(); // 热发布,多个订阅者共享数据
    }
    @messagemapping("stocks.subscribe")
    @sendto("/topic/stocks")
    public flux<stockquote> subscribe() {
        return stockquoteflux;
    }
    @messagemapping("stocks.filter")
    public flux<stockquote> filter(@payload string symbol) {
        return stockquoteflux.filter(quote -> 
            quote.getsymbol().equals(symbol));
    }
}

客户端连接示例

const socket = new sockjs('/ws');
const stompclient = stomp.over(socket);
stompclient.connect({}, () => {
    stompclient.subscribe('/topic/stocks', (message) => {
        const quote = json.parse(message.body);
        updatestocktable(quote);
    });
    stompclient.send("/app/stocks.filter", {}, "aapl");
});

高级特性

rsocket集成(更强大的响应式协议):

@controller
@messagemapping("stock.service")
public class rsocketstockcontroller {
    @messagemapping("current")
    public mono<stockquote> current(string symbol) {
        return stockservice.getcurrent(symbol);
    }
    @messagemapping("stream")
    public flux<stockquote> stream(string symbol) {
        return stockservice.getstream(symbol);
    }
    @messagemapping("channel")
    public flux<stockquote> channel(flux<string> symbols) {
        return symbols.flatmap(stockservice::getstream);
    }
}

背压控制

@messagemapping("large.data.stream")
public flux<datachunk> largedatastream() {
    return dataservice.streamlargedata()
        .onbackpressurebuffer(50, // 缓冲区大小
            chunk -> log.warn("dropping chunk due to backpressure"));
}

安全配置

@configuration
@enablewebfluxsecurity
public class securityconfig {
    @bean
    public securitywebfilterchain securityfilterchain(serverhttpsecurity http) {
        return http
            .authorizeexchange()
                .pathmatchers("/ws/**").authenticated()
                .anyexchange().permitall()
            .and()
            .httpbasic()
            .and()
            .csrf().disable()
            .build();
    }
    @bean
    public mapreactiveuserdetailsservice userdetailsservice() {
        userdetails user = user.withusername("user")
            .password("{noop}password")
            .roles("user")
            .build();
        return new mapreactiveuserdetailsservice(user);
    }
}

性能监控与最佳实践

监控端点配置

management:
  endpoints:
    web:
      exposure:
        include: health, metrics, prometheus
  metrics:
    tags:
      application: ${spring.application.name}

响应式应用监控

@bean
public meterregistrycustomizer<meterregistry> metricscommontags() {
    return registry -> registry.config()
        .commontags("application", "reactive-demo");
}
// 自定义指标
@bean
public webfilter metricswebfilter(meterregistry registry) {
    return (exchange, chain) -> {
        string path = exchange.getrequest().getpath().tostring();
        timer.sample sample = timer.start(registry);
        return chain.filter(exchange)
            .doonsuccessorerror((done, ex) -> {
                sample.stop(registry.timer("http.requests", 
                    "uri", path,
                    "status", exchange.getresponse().getstatuscode().tostring(),
                    "method", exchange.getrequest().getmethodvalue()));
            });
    };
}

最佳实践总结

  1. 线程模型理解

    • webflux 默认使用 netty 事件循环线程
    • 阻塞操作必须使用 publishon 切换到弹性线程池
  2. 背压策略选择

    • ui 客户端:使用 onbackpressuredroponbackpressurelatest
    • 服务间通信:使用 onbackpressurebuffer 配合合理缓冲区大小
  3. 错误处理原则

    • 尽早处理错误
    • 为每个 flux/mono 链添加错误处理
    • 区分业务异常和系统异常
  4. 测试策略

    • 使用 stepverifier 测试响应式流
    • 使用 webtestclient 测试控制器
    • 虚拟时间测试长时间操作
  5. 性能调优

    • 合理配置连接池
    • 监控关键指标(延迟、吞吐量、资源使用率)
    • 使用响应式日志框架(如 logback 异步appender)

通过以上全面实践,您将能够构建高性能、可扩展的响应式 web 应用,充分利用 webflux 的非阻塞特性,处理高并发场景下的各种挑战。

到此这篇关于spring webflux 深度实践指南的文章就介绍到这了,更多相关spring webflux内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com