1.引入依赖
如果使用了tomcat作为容器需要排除tomcat,webflux使用netty作为容器
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-webflux</artifactid>
<exclusions>
<exclusion>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-tomcat</artifactid>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupid>org.projectlombok</groupid>
<artifactid>lombok</artifactid>
</dependency>
2.定义请求类和接收类
aipromptdto 用于接收用户输入信息
@data
public class aipromptdto {
/**
* 大模型id
*/
private string serviceid;
/**
* 用户输入
*/
private string userinput;
/**
* sessionid
*/
private string sessionid;
/**
* 请求id
*/
private string requestid;
/**
* 获取token
*/
private string token;
}
答案接收对象
@data
@allargsconstructor
public class answerchunk {
/**
* 返回的内容
*/
private string content;
private string sessionid;
}
3.修改application.yml
此处配置response没有缓存,否则可能会阻塞,不会实时返回
reactor:
netty:
response:
buffer-size: 0
4.测试大模型获取数据格式
1.欢迎词
userinput:你好?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"messageid":"d47cce80-bcf0-49fe-8e23-06bb5ab79af3","messagecontent":"消息1:我是一个聊天机器人,这里是我的消息"}
id:[done]
data:[done]
2.问答
userinput:物料00ny681的库存有多少个?
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"库"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"存"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"中"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"物"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"料"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"0"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"n"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"y"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"6"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"8"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"的"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"数"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"量"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"为"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"1"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"个"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"data":"。"}
id:7e42b18e-741f-4dc1-9d56-4e5688e71c29
data:{"inquirylist":"[\"物料00ny681的库存是否充足?\",\"物料00ny681的库存位置在哪里?\",\"如何补充物料00ny681的库存?\"]"}
id:[done]
data:[done]
5.定义service接口和实现类
webflux返回mono或者flux
public interface aiservice {
/**
* 根据请求获取流式返回的答案
* @param request
* @return
*/
flux<answerchunk> processstream(aipromptdto request);
}
实现类aiserviceimpl
import org.springframework.web.reactive.function.client.webclient;
@service
public class aiserviceimpl implements aiservice {
private final webclient webclient;
//初始化webclient,并ssl校验,生产环境不要跳过
public aiserviceimpl(webclient.builder webclientbuilder) {
// 使用insecuretrustmanagerfactory来信任所有证书
sslcontextbuilder sslcontextbuilder = sslcontextbuilder.forclient()
.trustmanager(insecuretrustmanagerfactory.instance);
httpclient httpclient = httpclient.create()
.secure(sslcontextspec -> sslcontextspec.sslcontext(sslcontextbuilder))
.responsetimeout(duration.ofminutes(timeout));
this.webclient = webclientbuilder.clientconnector(
new reactorclienthttpconnector(httpclient)
).baseurl(aiforceurl).build();
}
@override
public flux<answerchunk> processstream(aipromptdto request) {
string body = jsonutil.tojsonstr(request);//参数都转化为json字符串
return webclient.post()
.uri(aiforceurl + "/aiforceplatformapi/openapi/llm/debugsse")//大模型地址
.bodyvalue(body)//body参数
.header("token", request.gettoken())//设置请求头
.header("content-type", "application/json")
.retrieve()//retrieve 方法会从服务器响应中提取数据
.bodytoflux(string.class)//响应体解析为一个流式的 string 类型序列
.map(chunk -> {//解析数据以供存储
//system.out.println("chunk = " + chunk);
string content = "";
// 解析大模型返回数据
if (!chunk.contains("[done]")) {//结束标志
if (chunk.contains("inquirylist")) {//处理返回的关联查询列表
content = parsechunk(chunk);
finalanswer[0].setquerylist(content);
}else if (chunk.contains("messageid")&&chunk.contains("messagecontent")) {//处理返回提示message
parsemessage(chunk, messagemap);
} else if (chunk.contains("data")) {//处理返回的问题答案
content = parsechunk(chunk);
redistemplate.opsforvalue().append(request.getrequestid() + "_result", content);
} else if (chunk.contains("question")) {//处理返回question
//先删除
questionservice.deletequestionsbypreviousidandrequest(questionid, requestid);
//保存ai返回的question
} else if (chunk.contains("image")) {//处理图片
parseimages(chunk, imagesurl);
} else if (chunk.contains("referenceinfo")) {//处理参照信息
parsereference(chunk, aianswerreferencelist);
}
} else {
// 处理结束
end.set("[done]");
finalanswer[0].setstate("done");
}
if (stringutils.isempty(chunk)) {
chunk = "";
}
return new answerchunk(chunk, request.getrequestid());
})
.dooncomplete(() -> {//答案都完成后存储对应数据到数据库中
string finalcontent = redistemplate.opsforvalue().get(request.getrequestid() + "_result");
redistemplate.delete(request.getrequestid());
//保存答案
string returnanswer = "";
jsonobject answer1 = new jsonobject().putonce("data", finalcontent);
//具体实现
})
.onerrorresume(e -> {//错误情况处理
finalanswer[0].setstate("failed");
answerservice.saveorupdate(finalanswer[0]);
return flux.error(e);
});
}
}
6.定义controller
@restcontroller
@requestmapping("/aiagent")
public class aiforcecontroller {
/**
* 获取内容
*
* @param request mediatype.text_event_stream_value 流式输出,否则会一次返回
* charset=utf-8 字符集,不设置会乱码
* 注意:使用get会中文乱码
* @return
*/
@postmapping(value = "/stream", produces = mediatype.text_event_stream_value + ";charset=utf-8")
public flux<serversentevent<string>> streamresponse(@requestbody aiforcepromptdto request) {
return aiservice.processstream(request)
.limitrate(100) // 限制每秒最大请求数
.onbackpressurebuffer(100,//背压策略:缓冲区大小为 100
buffer -> {
logger.warn("backpressure buffer overflow, dropping {} items", buffer);
}).publishon(schedulers.boundedelastic(),1) // 单线程调度确保顺序
.flatmap(chunk -> { // 使用 flatmap 将一个异步流中的每个元素映射为另一个流,并将这些流合并为一个单一的流
string content = chunk.getcontent();
if (stringutils.isnotblank(content)) {
string processedcontent = content.replaceall("`{3}", "\n```"); // 规范代码块格式
return flux.just(serversentevent.<string>builder()
.id(request.getrequestid())
.data(processedcontent)
.build());
}
return flux.empty();//如果内容为空,就返回空的flux
}, 1) // 设置并发度为 1,确保逐条发送
.doonnext(event -> logger.info("streaming chunk: {}", event.data())); // 日志记录每次发送的数据
}
}
// flux<serversentevent<string>> 实现 sse(server-sent events),以便客户端可以实时接收服务器推送的消息
7.调用结果

注意:在部署时,如果使用到了nginx需要配置

- chunked_transfer_encoding off 关闭分块传输,会发送完整的数据
- proxy_buffering off #禁用代理缓冲,适用于流式传输
- gzip off ##关闭压缩,数据以未压缩的方式传输
- add_header cache-control “no-cache” header定义无缓存
- add_header x-accel-buffering no;##禁用 nginx 的缓冲功能,确保数据实时传输
到此这篇关于java使用webflux调用大模型实现智能对话的文章就介绍到这了,更多相关java智能对话内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论