在上一章节中,我们深入分析了spring ai的阻塞式请求与响应机制,并探讨了如何增强其记忆能力。今天,我们将重点讲解流式响应的概念与实现。毕竟,ai的流式回答功能与其交互体验密切相关,是提升用户满意度的重要组成部分。
基本用法
基本用法非常简单,只需增加一个 stream 方法即可实现所需功能。接下来,我们将通过代码示例来展示这一过程,帮助您更清晰地理解如何在实际应用中进行操作。请看以下代码:
@getmapping(value = "/ai-stream",produces = mediatype.application_octet_stream_value + ";charset=utf-8")
flux<string> generationbystream(@requestparam("userinput") string userinput) {
flux<string> output = chatclient.prompt()
.user(userinput)
.stream()
.content();
return output;
}在我们增加 stream 方法之后,返回的对象类型将不再是原来的阻塞式 callresponsespec,而是转换为非阻塞的 streamresponsespec。与此同时,返回的数据类型也由之前的 string 变更为 flux。
在深入探讨其具体应用之前,首先让我来介绍一下 flux 的概念与特性。
spring webflux的处理器实现
首先,在 webflux 中,处理器已经实现了非阻塞式的功能。这意味着,只要我们的代码返回一个 flux 对象,就能轻松实现响应功能。通过这种方式,应用程序能够高效地处理并发请求,而不会因阻塞操作而影响整体性能。
@override
public mono<void> handle(serverwebexchange exchange) {
if (this.handlermappings == null) {
return createnotfounderror();
}
if (corsutils.ispreflightrequest(exchange.getrequest())) {
return handlepreflight(exchange);
}
return flux.fromiterable(this.handlermappings)
.concatmap(mapping -> mapping.gethandler(exchange))
.next()
.switchifempty(createnotfounderror())
.onerrorresume(ex -> handleresultmono(exchange, mono.error(ex)))
.flatmap(handler -> handlerequestwith(exchange, handler));
}这里简单介绍一下 spring webflux,虽然这不是我们的重点,但了解其基本概念还是很有帮助的。spring webflux 是 spring 框架的一部分,专为构建反应式应用而设计。它支持异步和非阻塞的编程模型,使得处理高并发请求变得更加高效。以下是 webflux 的几个关键特性:
- 反应式编程:webflux 基于反应式编程模型,使用
mono和flux类型来处理数据流。mono表示零或一个元素,而flux则表示零个或多个元素。这种模型使得我们可以轻松处理异步数据流,从而提高代码的可读性和可维护性。 - 非阻塞 i/o:webflux 通过非阻塞的 i/o 操作(如 netty 或 servlet 3.1+ 容器)来实现高效的资源利用。与传统的阻塞 i/o 不同,webflux 在等待响应时能够释放线程,这样一来,就可以显著提高应用的并发能力,支持更多的同时请求而不增加线程开销。
了解这些特性将为后续的非阻塞式响应设计奠定基础,帮助我们更好地利用 webflux 的能力来提升应用性能。
源码分析
现在我们来详细看看我们的 content 是如何操作的。接下来的代码示例将展示具体的实现方式,帮助我们理解在 webflux 中如何处理数据流和响应:
public flux<string> content() {
return dogetfluxchatresponse(this.request).map(r -> {
if (r.getresult() == null || r.getresult().getoutput() == null
|| r.getresult().getoutput().getcontent() == null) {
return "";
}
return r.getresult().getoutput().getcontent();
}).filter(stringutils::haslength);
}这里的实现相对简单,主要是传入了一个函数。接下来,我们将深入分析 dogetfluxchatresponse 的代码实现,以便更好地理解其具体逻辑和运作方式:
private flux<chatresponse> dogetfluxchatresponse2(defaultchatclientrequestspec inputrequest) {
//此处省略重复代码
var fluxchatresponse = this.chatmodel.stream(prompt);
//此处省略重复代码
return advisedresponse;
}这里的代码逻辑与阻塞回答基本相同,唯一的不同之处在于它调用了 chatmodel.stream(prompt) 方法。接下来,我们将深入探讨 chatmodel.stream(prompt) 方法的具体实现和其背后的设计思路:
public flux<chatresponse> stream(prompt prompt) {
return flux.defercontextual(contextview -> {
//此处省略重复代码
flux<openaiapi.chatcompletionchunk> completionchunks = this.openaiapi.chatcompletionstream(request,
getadditionalhttpheaders(prompt));
//此处省略重复代码
flux<chatresponse> chatresponse = completionchunks.map(this::chunktochatcompletion)
.switchmap(chatcompletion -> mono.just(chatcompletion).map(chatcompletion2 -> {
//此处省略重复代码
return new chatresponse(generations, from(chatcompletion2, null));
}
}));
//此处省略重复代码
return new messageaggregator().aggregate(flux, observationcontext::setresponse);
});
}同样的逻辑在这里就不再赘述,我们将重点关注其中的区别。在这一部分,我们使用了 chatcompletionstream,而且与之前不同的是,这里不再使用 retrytemplate,而是引入了 webclient,这是一个能够接收事件流的工具类。
public flux<chatcompletionchunk> chatcompletionstream(chatcompletionrequest chatrequest,
multivaluemap<string, string> additionalhttpheader) {
assert.notnull(chatrequest, "the request body can not be null.");
assert.istrue(chatrequest.stream(), "request must set the stream property to true.");
atomicboolean isinsidetool = new atomicboolean(false);
return this.webclient.post()
.uri(this.completionspath)
.headers(headers -> headers.addall(additionalhttpheader))
.body(mono.just(chatrequest), chatcompletionrequest.class)
.retrieve()
.bodytoflux(string.class)
// cancels the flux stream after the "[done]" is received.
.takeuntil(sse_done_predicate)
// filters out the "[done]" message.
.filter(sse_done_predicate.negate())
.map(content -> modeloptionsutils.jsontoobject(content, chatcompletionchunk.class))
//此处省略一堆代码这段代码的主要目的是通过 webclient 向指定路径发起一个 post 请求,同时设置合适的请求头和请求体。在获取响应数据时,使用了事件流的方式(通过 bodytoflux 方法)来接收响应内容,并对数据进行过滤和转换,最终将其转化为 chatcompletionchunk 对象。
尽管其余的业务逻辑与之前相似,但有一点显著的区别,即整个流程的返回类型以及与 openai api 的调用方式都是非阻塞式的。
总结
在当今的数字时代,流式响应机制不仅提升了系统的性能,还在用户体验上扮演了关键角色。通过引入 flux 类型,spring webflux 的设计理念使得应用能够以非阻塞的方式处理并发请求,从而有效利用资源并减少响应延迟。
我们终于全面讲解了spring ai的基本操作,包括阻塞式回答、流式回答以及记忆增强功能。这些内容为我们深入理解其工作机制奠定了基础。接下来,我们将继续深入探索源码,重点分析回调函数、实体类映射等重要功能。
这将帮助我们更好地理解spring ai的内部运作原理,并为进一步的优化和定制化提供指导。
到此这篇关于深入探索spring ai:源码分析流式回答的文章就介绍到这了,更多相关spring ai流式回答内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论