在使用 dify(假设为某种生成式 ai 模型或服务)结合 spring boot 和 webclient 实现流式输出时,我们需要确保技术栈的版本兼容性,并理解流式输出的核心概念。以下是详细讲解:
1. 技术栈版本要求
spring boot 版本要求
最低推荐版本:2.7.x 或 3.x
如果需要支持 http/2 或更高级别的异步处理能力,建议使用 spring boot 3.x。
spring boot 3.x 基于 spring framework 6.x 和 java 17+,提供了更好的反应式编程支持。
jdk 版本要求
最低推荐版本:java 11
spring boot 2.7.x 支持 java 8 及以上,但推荐使用 java 11 或更高版本。
如果使用 spring boot 3.x,则必须使用 java 17 或更高版本,因为 spring boot 3.x 已经停止支持 java 11 以下的版本。
2. 核心概念:流式输出
流式输出(streaming output)是指服务器以分块的方式逐步将数据发送到客户端,而不是一次性返回完整的结果。这种方式特别适合处理大文件传输、实时数据流或生成式模型的逐词输出。
在 spring boot 中,可以通过以下方式实现流式输出:
- 使用 responseentity<flux<?>> 或 responsebodyemitter(适用于同步场景)。
- 使用 webclient 的反应式编程模型来处理流式请求和响应。
3. 实现步骤
3.1 添加依赖
确保在 pom.xml 中添加以下依赖项:
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-webflux</artifactid>
</dependency>
spring-boot-starter-webflux 提供了反应式 web 编程的支持。
3.2 配置 webclient
创建一个 webclient 实例,主要用于设置跨域资源共享(cors, cross-origin resource sharing)。它的作用是解决前端和后端在不同域名或端口下通信时的跨域问题。
@configuration
public class webconfig implements webmvcconfigurer {
static final list<string> origin_list = arrays.aslist(
// 本地
"http://localhost:8080",
"http://127.0.0.1:8080",
"http://localhost:8888",
"http://127.0.0.1:8888",
"http://localhost:8803",
"http://127.0.0.1:8803"
);
@override
public void addcorsmappings(corsregistry registry) {
// 配置全局跨域规则
registry.addmapping("/**") // 允许所有路径的请求
.allowedorigins(origin_list.toarray(new string[0])) // 允许的源
.allowedmethods("get", "post", "put", "delete", "options") // 允许的http方法
.allowedheaders("content-type", "authorization") // 允许的请求头
.allowcredentials(true); // 是否允许发送cookie等凭证信息
}
}
3.3 实现流式输出控制器
@slf4j
@restcontroller
@requestmapping("/api")
@requiredargsconstructor
public class difycontroller {
@value("${portal.chatmessages}")
private string chatmessages;
private final difyservice difyservice;
@getmapping(value = "/chatmessagesstreaming", produces = "text/event-stream")
public flux<streamresponse> chatmessagesstreaming(httpservletrequest request,
@requestparam(value = "query", required = true) string query,
@requestparam(value = "username", required = true) string username,
@requestparam(value = "conversationid", required = false) string conversationid) throws exception {
return difyservice.streamingmessage(query, conversationid, username).doonnext(response -> {
log.info("流式结果:" + response.tostring());
//workflow_finished节点可以获取完整答案,进行你的逻辑处理
if (response.getevent().equals("workflow_finished")) {
log.info("进入workflow_finished阶段");
string answer = response.getdata().getoutputs().getanswer();//完整答案
}
//message_end结束节点,进行你的逻辑处理
if (response.getevent().equals("message_end")) {
log.info("进入message_end");
}
});
}
3.4 实现流式输出服务层
java
@slf4j
@service
@requiredargsconstructor
public class difyservice {
@value("${dify.url}")
private string url;
@value("${dify.key}")
private string apikey;
/**
* 流式调用dify.
*
* @param query 查询文本
* @param conversationid id
* @param username 用户名
* @return flux 响应流
*/
public flux<streamresponse> streamingmessage(string query, string conversationid, string username) {
//1.设置请求体
difyrequestbody body = new difyrequestbody();
body.setinputs(new hashmap<>());
body.setquery(query);
body.setresponsemode("streaming");
body.setconversationid("");
body.setuser(username);
if (stringutils.isnotempty(conversationid)) {
body.setconversationid(conversationid);
}
//如果存在自定义入参可以加到如下map中
//map<string, object> commoninputs = new hashmap<>();
//commoninputs.put("search_type", searchtype);
//body.setinputs(commoninputs);
//2.使用webclient发送post请求
return webclient.post()
.uri(url)
.headers(httpheaders -> {
httpheaders.setcontenttype(mediatype.application_json);
httpheaders.setbearerauth(apikey);
})
.bodyvalue(json.tojsonstring(body))
.retrieve()
.bodytoflux(streamresponse.class);//实体转换
.filter(this::shouldinclude) // 过滤掉不需要的数据【根据需求增加】
//.map(this::converttocustomresponseasync) // 异步转换【如果返回格式自定义则通过异步转换实现】
.onerrorresume(throwable -> {
log.info("异常输出:"+throwable.getmessage())
})
//.concatwith(mono.just(createcustomfinalmessage())); // 添加自定义的最终消息【根据需求增加】
}
private boolean shouldinclude(streamresponse streamresponse) {
// 示例:只要message节点的数据和message_end节点的数据
if (streamresponse.getevent().equals("message")
|| streamresponse.getevent().equals("message_end")) {
return true;
}
return false;
}
3.4 实现流式输出数据访问层
和dify返回流式输出格式一致
@data
public class streamresponse implements serializable {
/**
* 不同模式下的事件类型.
*/
private string event;
/**
* agent_thought id.
*/
private string id;
/**
* 任务id.
*/
private string task_id;
/**
* 消息唯一id.
*/
private string message_id;
/**
* llm 返回文本块内容.
*/
private string answer;
/**
* 创建时间戳.
*/
private long created_at;
/**
* 会话 id.
*/
private string conversation_id;
private streamresponsedata data;
}
@data
public class streamresponsedata implements serializable {
private string id;
private string workflow_id;
private string status;
private long created_at;
private long finished_at;
private outputsdata outputs;
}
@data
public class outputsdata implements serializable {
private string answer;
}
4. 关键点说明
1.mediatype.text_event_stream_value
表示使用 server-sent events (sse) 协议进行流式传输。
客户端可以通过浏览器或支持 sse 的工具(如 postman)接收流式数据。
2.flux
flux 是 reactor 库中的核心类型,表示一个可以包含零个或多个元素的异步序列。
在这里,flux 表示从 dify 接收到的逐词或逐句生成的文本流。
3.webclient 的反应式特性
webclient 是 spring 提供的反应式 http 客户端,能够高效处理流式数据。
它不会阻塞线程,而是通过事件驱动的方式逐步处理数据
总结
通过上述步骤,我们可以使用 spring boot 和 webclient 实现流式输出功能。关键在于利用反应式编程模型(reactor 的 flux 和 webclient),以及正确配置流式传输协议(如 sse)。根据需求选择合适的 spring boot 和 jdk 版本,可以确保项目的性能和稳定性。
到此这篇关于基于springboot和dify实现流式响应输出的文章就介绍到这了,更多相关springboot dify流式响应输出内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论