背景与架构概览
在企业 crm 系统中,我们希望为业务人员提供一个内嵌的 ai 助手,让用户能直接在系统内输入问题、实时获取 ai 回答,而无需跳转到外部 ai 平台。
整体方案选型:
| 组件 | 说明 |
|---|---|
| open webui | 开源 llm 前端平台,提供 openai 兼容接口,支持多模型管理 |
| openai-java sdk | 官方 java 客户端,直接对接 openai 兼容 api |
| spring webflux | 响应式编程,支持 sse(server-sent events)流式推送 |
| redis | 缓存 token,避免每次请求都重新登录 open webui |
整体请求链路:
前端输入框 → post /v1/chat/completions
→ llmcontroller(sse 接口)
→ llmservice(获取 token + 构造请求)
→ openai java sdk(流式调用 open webui)
→ sse 逐块推送回前端
依赖与配置
maven 依赖
<!-- openai java 官方 sdk -->
<dependency>
<groupid>com.openai</groupid>
<artifactid>openai-java</artifactid>
<version>2.5.0</version>
</dependency>
<!-- spring webflux(sse 支持) -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-webflux</artifactid>
</dependency>
<!-- hutool(http 工具 + json 解析) -->
<dependency>
<groupid>cn.hutool</groupid>
<artifactid>hutool-all</artifactid>
<version>5.8.x</version>
</dependency>
<!-- spring data redis -->
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>application.yml 配置
open-web-ui: base-url: http://your-openwebui-host/api # open webui 的 openai 兼容接口地址
open webui 的 openai 兼容接口通常为 http://host/api/v1,请根据实际部署调整。
token 生命周期管理
设计思路
open webui 使用 jwt token 进行鉴权,token 有有效期(默认约数小时)。若每次调用 ai 接口都重新登录,不仅效率低下,还会对 open webui 服务产生不必要的登录压力。
因此,我们设计了一套 “先查缓存 → 有效直接用 → 过期再刷新” 的 token 自动管理机制,并通过 redis 实现跨实例共享。
tokenrepository 接口抽象
定义标准的增删查接口,便于后续替换为其他存储介质(如内存 map、数据库等)。
public interface tokenrepository {
openwebuitoken get(string key);
void save(string key, openwebuitoken value);
void delete(string key);
}
设计亮点:通过接口隔离存储实现,tokenmanager 不依赖具体存储技术,方便单元测试和替换。
redis 存储实现
public class redistokenrepository implements tokenrepository {
private string prefix = "openwebui:";
private final redistemplate<object, object> redistemplate;
public redistokenrepository(redistemplate<object, object> redistemplate) {
this.redistemplate = redistemplate;
}
public redistokenrepository(string prefix, redistemplate<object, object> redistemplate) {
this.prefix = prefix;
this.redistemplate = redistemplate;
}
@override
public openwebuitoken get(string key) {
return beanutil.copyproperties(
redistemplate.opsforvalue().get(prefix + key),
openwebuitoken.class
);
}
@override
public void save(string key, openwebuitoken value) {
redistemplate.opsforvalue().set(prefix + key, value);
}
@override
public void delete(string key) {
redistemplate.delete(prefix + key);
}
}
关键点说明:
- key 格式为
openwebui:{username},通过前缀做命名空间隔离,避免与其他 redis key 冲突。 - 使用
beanutil.copyproperties从 redis 返回的linkedhashmap反序列化为强类型对象,避免手动类型转换。
openwebuitoken 数据模型
@data
@builder
@allargsconstructor
@noargsconstructor
public class openwebuitoken implements serializable {
@schema(description = "认证token")
private string accesstoken;
@schema(description = "token过期时间(毫秒时间戳)")
private long expireat;
@schema(description = "用户名")
private string username;
@schema(description = "密码")
private string password;
/**
* 判断 token 是否仍然有效
*/
public boolean isvalid() {
return stringutils.isnotblank(accesstoken)
&& objects.nonnull(expireat)
&& timeutil.getlocaldatetime(expireat).isafter(localdatetime.now());
}
}
isvalid() 方法封装了有效性判断,token 有效的条件:
accesstoken不为空expireat不为空- 当前时间在过期时间之前
openwebuitokenmanager 核心管理器
这是整个 token 管理的核心组件,负责:
- 优先从缓存获取有效 token
- token 失效时,发起 http 请求重新登录 open webui 并刷新缓存
- 使用
@synchronized防止并发场景下的重复登录(双检锁模式)
@slf4j
public class openwebuitokenmanager {
/** token 默认有效期(秒),登录接口无 expires_at 时使用 */
private static final long token_expire_time = 60 * 60l;
private final string signinurl;
private final tokenrepository tokenrepository;
public openwebuitokenmanager(string signinurl, tokenrepository tokenrepository) {
this.signinurl = signinurl;
this.tokenrepository = tokenrepository;
}
/**
* 获取有效 token(优先缓存,缓存失效则重新登录)
*/
public string getvalidtoken(string username, string password) {
if (!stringutils.hastext(username) || !stringutils.hastext(password)) {
log.warn("用户名或密码为空");
return null;
}
// 1. 查询缓存
openwebuitoken cachedtoken = tokenrepository.get(username);
// 2. 缓存有效直接返回
if (cachedtoken != null && cachedtoken.isvalid()) {
log.debug("使用缓存 token: {}", username);
return cachedtoken.getaccesstoken();
}
// 3. 缓存失效,重新登录
log.info("token 过期或不存在,重新登录: {}", username);
return refreshtoken(username, password);
}
public string getvalidtoken(credential credential) {
if (objects.isnull(credential)) return null;
return getvalidtoken(credential.getusername(), credential.getpassword());
}
/**
* 刷新 token(加锁防并发,内部二次检查)
*/
@synchronized
public string refreshtoken(string username, string password) {
// 二次检查:加锁后再次确认缓存是否已被其他线程刷新
openwebuitoken cachedtoken = tokenrepository.get(username);
if (cachedtoken != null && cachedtoken.isvalid()) {
return cachedtoken.getaccesstoken();
}
try {
// 调用 open webui 登录接口
httpresponse response = httputil.createpost(signinurl)
.header("content-type", "application/json")
.body(jsonutil.tojsonstr(map.of("email", username, "password", password)))
.execute();
if (!response.isok() || !stringutils.hastext(response.body())) {
throw new runtimeexception("登录失败: " + response.getstatus());
}
// 解析响应
jsonobject responsedata = jsonutil.parseobj(response.body());
string token = responsedata.getstr("token");
// 计算过期时间:优先使用接口返回值,否则使用默认值
long expirestime = timeutil.getepochmilli(localdatetime.now().plusseconds(token_expire_time));
string expiresat = responsedata.get("expires_at").tostring();
if (stringutils.hastext(expiresat)) {
expirestime = long.parselong(expiresat) * 1000; // 秒 → 毫秒
}
// 构建并缓存 token
openwebuitoken webuitoken = openwebuitoken.builder()
.accesstoken(token)
.username(username)
.password(password)
.expireat(expirestime)
.build();
tokenrepository.save(username, webuitoken);
log.info("token 刷新成功: {}, 过期时间: {}", username, expirestime);
return token;
} catch (exception e) {
log.error("登录获取 token 失败: {}", username, e);
throw new runtimeexception("open webui 登录失败", e);
}
}
}
并发安全分析:
线程 a: getvalidtoken → 缓存失效 → refreshtoken(加锁) 线程 b: getvalidtoken → 缓存失效 → refreshtoken(等待锁) 线程 a: 登录成功,缓存 token,释放锁 线程 b: 获取到锁 → 二次检查缓存 → 发现 token 有效 → 直接返回
通过二次检查(double-check),避免了多个线程同时发起重复登录请求。
凭证获取
系统用户与 open webui 账号存在映射关系。credentialprovider 负责根据当前登录用户 id 查询其对应的 open webui 账号和密码。
@slf4j
@component
public class credentialprovider {
@resource
private sysuserservice sysuserservice;
/**
* 根据用户 id 获取 open webui 登录凭证
*/
public credential getcredential(string userid) {
sysuser sysuser = sysuserservice.getbyid(userid);
if (objects.isnull(sysuser)) {
throw new businessexception(401, "用户不存在");
}
// 用户的 ai 邮箱(即 open webui 账号)
string aiemail = sysuser.getaiemail();
// 密码规则:邮箱前缀 + 固定后缀
string password = aiemail.split("@")[0] + "ai++2025&";
return new credential(aiemail, password);
}
}
设计说明:
- 系统在用户表中存储
aiemail字段,与 open webui 用户一一对应。 - 密码采用固定规则生成,方便批量初始化 open webui 用户,同时保持一定的随机性。
- 这种设计使得系统用户与 ai 平台账号解耦,无需在系统中明文存储 ai 平台密码。
系统提示词管理
系统提示词(system prompt)决定了 ai 的角色定位和回答风格。为了方便维护多套提示词,我们将其以文本文件的形式存放在 classpath 中,通过枚举统一管理。
提示词枚举
@getter
@allargsconstructor
public enum systempromptenum {
default("default", "默认");
private final string code;
private final string message;
/**
* 根据 code 获取枚举,未找到时返回 default
*/
public static systempromptenum of(string code) {
for (systempromptenum value : values()) {
if (value.getcode().equals(code)) {
return value;
}
}
return default;
}
}
提示词加载器
@slf4j
public class systempromptloader {
private static final string system_prompt_path = "prompt/";
private static final string system_prompt_suffix = "-system-prompt.txt";
private systempromptloader() {}
/**
* 从 classpath 加载指定名称的系统提示词文件
* 文件路径:resources/prompt/{promptname}-system-prompt.txt
*/
public static string loadsystemprompt(string promptname) {
classpathresource resource = new classpathresource(
system_prompt_path + promptname + system_prompt_suffix
);
log.info("加载系统提示: {}", resource.getpath());
try (inputstream in = resource.getinputstream()) {
return new string(in.readallbytes(), charset.defaultcharset());
} catch (exception e) {
log.error("加载系统提示失败: {}", promptname, e);
return "";
}
}
public static string loadsystemprompt(systempromptenum promptenum) {
return loadsystemprompt(promptenum.getcode());
}
}
文件结构示例:
src/main/resources/
└── prompt/
└── default-system-prompt.txt ← 默认场景提示词default-system-prompt.txt 内容示例:
你是一名专业的企业 crm 智能助手,请根据用户的问题给出准确、简洁的回答。 回答时请使用中文,保持专业、友好的语气。
扩展新场景只需新增枚举值和对应文本文件,无需修改业务代码,符合开闭原则。
spring bean 配置
@configuration
public class openwebuiconfig {
@bean
public openwebuitokenmanager openwebuitokenmanager(
redistemplate<object, object> redistemplate) {
return new openwebuitokenmanager(
openwebuiconstant.login_url, // open webui 登录接口地址
new redistokenrepository(redistemplate) // redis token 存储
);
}
}
openwebuiconstant.login_url 参考值:
public class openwebuiconstant {
public static final string login_url = "http://your-openwebui-host/api/v1/auths/signin";
}
请求与响应模型
请求参数chatrequest
@data
public class chatrequest {
@schema(description = "场景标识(用于加载特定场景提示词),默认 default")
private string code = systempromptenum.default.getcode();
@notempty(message = "请输入文字...")
@schema(description = "用户输入内容")
private string prompt;
@schema(description = "模型名称,默认 qwen3-vl-8b-instruct")
private string model = "qwen3-vl-8b-instruct";
}
字段说明:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
code | string | 否 | 场景标识,关联系统提示词,默认 default |
prompt | string | 是 | 用户输入的问题 |
model | string | 否 | 指定 open webui 中部署的模型名称 |
响应数据chatstreamingvo
@data
@allargsconstructor
@noargsconstructor
@builder
public class chatstreamingvo {
@schema(description = "本次推送的内容片段")
private string content;
@schema(description = "使用的模型名称")
private string model;
}
每个 sse 事件携带一个内容片段(content),前端拼接所有片段即可得到完整回答。
llmservice 流式对话核心实现
这是整个功能的核心,主要完成以下步骤:
- 加载当前场景的系统提示词
- 获取当前用户的 open webui 凭证和有效 token
- 使用 openai java sdk 构建流式请求
- 通过
flux+ sse 将响应片段逐块推送
@slf4j
@service
public class llmservice {
@resource
private openwebuitokenmanager openwebuitokenmanager;
@value("${open-web-ui.base-url}")
private string baseurl;
@resource
private credentialprovider credentialprovider;
public flux<serversentevent<chatstreamingvo>> chatstream(chatrequest chatrequest) {
// 1. 加载系统提示词
string systemprompt = systempromptloader.loadsystemprompt(
systempromptenum.of(chatrequest.getcode())
);
// 2. 获取当前登录用户的 open webui token
credential credential = credentialprovider.getcredential(
securityutils.getuser().getid()
);
string validtoken = openwebuitokenmanager.getvalidtoken(credential);
// 3. 构建 openai java 客户端(复用 open webui 兼容接口)
openaiclient aiclient = openaiokhttpclient.builder()
.baseurl(baseurl)
.apikey(validtoken) // 将 open webui token 作为 api key
.build();
// 4. 构建对话参数
chatcompletioncreateparams params = chatcompletioncreateparams.builder()
.model(chatrequest.getmodel())
.addsystemmessage(systemprompt) // 系统提示词
.addusermessage(chatrequest.getprompt()) // 用户输入
.build();
// 5. 流式调用 + flux 包装 + sse 封装
return flux.using(
// 创建流式响应资源
() -> aiclient.chat().completions().createstreaming(params),
// 将 stream<chatcompletionchunk> 转换为 flux<serversentevent>
streamresponse -> flux.fromstream(streamresponse.stream())
.map(chunk -> {
// 提取当前 chunk 中的文本内容
string content = chunk.choices().stream()
.findfirst()
.flatmap(choice -> choice.delta().content())
.orelse("");
return serversentevent.<chatstreamingvo>builder()
.data(new chatstreamingvo(content, chatrequest.getmodel()))
.build();
}),
// 流结束/出错/取消 时关闭资源,防止连接泄漏
streamresponse::close
)
// 切换到弹性线程池,避免阻塞事件循环线程
.subscribeon(schedulers.boundedelastic())
// 全局异常处理
.onerrorresume(e -> {
log.error("llm 流式对话异常", e);
throw new businessexception("llm 流式对话异常");
});
}
}
关键技术点详解
flux.using的资源管理模式
flux.using 是 reactor 提供的资源管理操作符,它的三个参数分别对应:
flux.using(
resourcesupplier, // 创建资源(流式响应对象)
sourcesupplier, // 使用资源生产数据
resourcecleanup // 资源清理(无论成功/失败/取消都会执行)
)
这里使用它来确保 streamresponse 对象(底层是一个 http 长连接)无论何种情况下都能被正确关闭,防止连接资源泄漏。
subscribeon(schedulers.boundedelastic())
openai sdk 的流式调用是阻塞的 i/o 操作,而 spring webflux 的事件循环线程(netty nio 线程)不允许被阻塞。通过 subscribeon 将订阅行为切换到 boundedelastic 线程池(专为阻塞 i/o 设计),避免阻塞主事件循环。
sse 数据格式
serversentevent 对象在 spring webflux 中会被序列化为标准的 sse 格式:
data: {"content":"你好","model":"qwen3-vl-8b-instruct"}
data: {"content":",有什么可以","model":"qwen3-vl-8b-instruct"}
data: {"content":"帮助您的?","model":"qwen3-vl-8b-instruct"}
llmcontroller 接口层
@tag(name = "llm 对话")
@restcontroller
@requestmapping("/v1/chat")
public class llmcontroller {
@resource
private llmservice llmservice;
@operation(summary = "llm 流式对话")
@preauthorize("@knifesecurity.authenticated()")
@postmapping(value = "/completions", produces = "text/event-stream")
public flux<serversentevent<chatstreamingvo>> chatstream(
@valid @requestbody chatrequest chatrequest) {
return llmservice.chatstream(chatrequest);
}
}
要点:
produces = "text/event-stream":声明接口返回 sse 格式,浏览器eventsourceapi 及 fetch 流均可消费。@preauthorize:接口鉴权,确保只有登录用户才能访问。- 返回
flux<serversentevent<...>>:spring webflux 会自动将其转换为持续推送的 sse 响应。
前端对接思路
前端通过 fetch api + readablestream 消费 sse,实时渲染流式内容:
async function sendchat(prompt) {
const response = await fetch('/v1/chat/completions', {
method: 'post',
headers: {
'content-type': 'application/json',
'authorization': `bearer ${token}`
},
body: json.stringify({ prompt, code: 'default', model: 'qwen3-vl-8b-instruct' })
});
const reader = response.body.getreader();
const decoder = new textdecoder('utf-8');
let answer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 解析 sse 数据行
const text = decoder.decode(value);
const lines = text.split('\n').filter(line => line.startswith('data:'));
for (const line of lines) {
const data = line.replace('data:', '').trim();
if (!data || data === '[done]') continue;
try {
const parsed = json.parse(data);
answer += parsed.content;
// 更新 ui 文本框
document.getelementbyid('answer').innertext = answer;
} catch (e) {
// 忽略非 json 行
}
}
}
}
vue 3 + element plus 示例(输入框 + 流式渲染):
<template>
<div class="ai-chat">
<el-input
v-model="prompt"
type="textarea"
:rows="3"
placeholder="输入你的问题..."
@keydown.ctrl.enter="sendchat"
/>
<el-button type="primary" :loading="loading" @click="sendchat">发送</el-button>
<div class="answer" v-if="answer">
<pre>{{ answer }}</pre>
</div>
</div>
</template>
<script setup>
import { ref } from 'vue'
import { useuserstore } from '@/store/user'
const prompt = ref('')
const answer = ref('')
const loading = ref(false)
const userstore = useuserstore()
async function sendchat() {
if (!prompt.value.trim()) return
loading.value = true
answer.value = ''
const response = await fetch('/v1/chat/completions', {
method: 'post',
headers: {
'content-type': 'application/json',
'authorization': `bearer ${userstore.token}`
},
body: json.stringify({ prompt: prompt.value })
})
const reader = response.body.getreader()
const decoder = new textdecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const lines = decoder.decode(value).split('\n')
for (const line of lines) {
if (!line.startswith('data:')) continue
const json = line.slice(5).trim()
if (!json || json === '[done]') continue
answer.value += json.parse(json).content
}
}
} finally {
loading.value = false
}
}
</script>整体流程图
┌────────────────────────────────────────────────────────────────────┐
│ 前端浏览器 │
│ 用户输入 prompt → fetch post /v1/chat/completions │
│ ← 逐块接收 sse 数据 → 拼接渲染到文本框 │
└────────────────────────────────┬───────────────────────────────────┘
│ http sse
┌────────────────────────────────▼───────────────────────────────────┐
│ llmcontroller │
│ @postmapping(produces = "text/event-stream") │
│ → 调用 llmservice.chatstream(chatrequest) │
└────────────────────────────────┬───────────────────────────────────┘
│
┌────────────────────────────────▼───────────────────────────────────┐
│ llmservice │
│ 1. systempromptloader.loadsystemprompt(code) 加载提示词 │
│ 2. credentialprovider.getcredential(userid) 获取 ai 凭证 │
│ 3. tokenmanager.getvalidtoken(credential) 获取有效 token │
│ 4. openaiokhttpclient.build(baseurl, token) 构建 sdk 客户端 │
│ 5. flux.using(createstreaming, mapchunks, close) 流式调用 │
└────┬───────────────────────────────────────────┬───────────────────┘
│ token 管理 │ ai 调用
┌────▼──────────────────────────┐ ┌─────────────▼──────────────────┐
│ openwebuitokenmanager │ │ open webui │
│ ┌──────────────────────────┐ │ │ post /api/v1/auths/signin │
│ │ redis 缓存查询 │ │ │ post /api/v1/chat/completions │
│ │ → 有效:直接返回 │ │ │ (openai 兼容接口) │
│ │ → 失效:登录刷新 + 缓存 │ │ │ │
│ └──────────────────────────┘ │ │ → 流式返回 chatcompletionchunk │
└───────────────────────────────┘ └────────────────────────────────┘
设计总结与经验
亮点设计
| 设计点 | 说明 |
|---|---|
| token 双检锁 | @synchronized + 内部二次校验,防止高并发下重复登录 |
| 接口隔离存储 | tokenrepository 接口 + redistokenrepository 实现,易替换、易测试 |
| 提示词文件化 | 提示词以 .txt 存放 classpath,通过枚举管理多场景,无需改代码 |
| 资源安全释放 | flux.using 三段式确保流式连接必然关闭,防止资源泄漏 |
| 线程模型正确 | subscribeon(schedulers.boundedelastic()) 避免阻塞 webflux 事件线程 |
| 用户凭证映射 | 系统用户与 ai 平台账号分离,凭证由 credentialprovider 统一管理 |
注意事项
token 有效期与 redis ttl 保持一致:建议在 save 时同步设置 redis key 的过期时间,避免 redis 中存放已过期 token 占用内存。
// 改进建议:设置 redis ttl long ttlseconds = (expirestime - system.currenttimemillis()) / 1000; redistemplate.opsforvalue().set(prefix + key, value, ttlseconds, timeunit.seconds);
open webui 模型名称与实际部署保持一致:chatrequest.model 的默认值 qwen3-vl-8b-instruct 需要与 open webui 中实际加载的模型 id 完全匹配,否则会返回 404。
系统提示词文件编码:建议统一使用 utf-8 编码保存 .txt 文件,避免中文乱码。
sse 连接超时:生产环境中建议配置 nginx 的 proxy_read_timeout 和 proxy_buffering off,确保 sse 长连接不被中断。
location /v1/chat/ {
proxy_pass http://backend;
proxy_buffering off;
proxy_read_timeout 120s;
proxy_set_header cache-control no-cache;
}ai 账号批量初始化:在用户表中添加 ai_email 字段后,需要在 open webui 中预先创建对应账号,或通过 open webui 管理接口批量创建。
小结
本文完整介绍了在企业 spring boot 项目中集成 open webui 的实践方案:
- 通过 redis 缓存 + 双检锁 实现 token 的自动管理与并发安全;
- 通过 枚举 + classpath 文本文件 实现多场景系统提示词的灵活管理;
- 通过 openai java sdk + spring webflux + sse 实现流式 ai 响应;
- 通过 fetch readablestream 在前端实时渲染流式输出。
这套架构可以方便地扩展到更多 ai 场景:代码审查、文案生成、智能客服等,只需新增场景枚举和对应提示词文件即可快速接入。
以上就是springboot集成open webui实现ai流式对话的详细内容,更多关于springboot ai流式对话的资料请关注代码网其它相关文章!
发表评论