springboot sseemitter流式输出
前言
最近做ai类的开发,看到各大ai模型的输出方式都是采取的一种eventstream的方式实现。
不是通常的等接口处理完成后,一次性返回。
而是片段式的处理完成一个分片,就立马告知前端做出处理;后续处理出新的片段则再次发送给客户端。
在spring框架中就有一个类似的方式实现。sseemitter。
sseemitter 简介
sseemitter 是在spring 4.2开始引入的,使用的话需要注意版本,不过springboot 2.x 是可以玩的。
测试demo
编写一段代码,循环返回给客户端。如下所示:
package cn.xj.controller;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.restcontroller;
import org.springframework.web.servlet.mvc.method.annotation.sseemitter;
import java.io.ioexception;
import java.util.concurrent.executor;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;
@restcontroller
@requestmapping("/sse/mitter")
public class ssemittercontroller {
@getmapping(value = "/stream", produces = "text/event-stream")
public sseemitter stream() {
// 设置默认超时时间 0l 表示无限
// 注意:这里的单位是 ms
sseemitter sseemitter = new sseemitter(30000l);
// 最好不要阻塞主线程
executors.newsinglethreadexecutor().execute(() -> {
try {
for (int i = 0; i < 10; i++) {
sseemitter.send("这只是一个流式输出案例:" + i);
timeunit.seconds.sleep(1);
}
// 通知客户端消息发送完毕
sseemitter.complete();
} catch (exception e) {
e.printstacktrace();
sseemitter.completewitherror(e);
}
});
return sseemitter;
}
}浏览器请求,打开控制台查看数据格式,如下所示:

注意点
异常一 responsebodyemitter is already set complete
这种问题通常是 设置超时时间timeout太小导致的。网上很多demo中说的这个单位是秒,但实际测试来看,单位应该是毫秒 ms。
补充:springboot中sse流式输出中止的核心代码
springboot中sse流式输出中止的核心代码
在大模型会话中,会有一个功能是停止生成功能。这个功能如果在前端实现,既取消监听后端的流式返回事件,会导致后端日志中报错连接中断等错误。
由此引出的需求,我的接口a中使用了sse流式返回,需要做一个接口b,b的功能是中止第一个接口的流式返回,以下是核心代码和思路:
方案一:需要借助redis,在输出时循环判定来解决。
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.data.redis.core.redistemplate;
import org.springframework.stereotype.controller;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.responsebody;
import org.springframework.web.servlet.mvc.method.annotation.sseemitter;
import javax.servlet.http.httpservletrequest;
import java.io.bufferedreader;
import java.io.ioexception;
import java.io.inputstreamreader;
import java.util.concurrent.timeunit;
@controller
public class mycontroller {
@autowired
private redistemplate<string, object> redistemplate;
@requestmapping("/startstreaming")
public sseemitter startstreaming(httpservletrequest request) throws ioexception {
string requestid = request.getid(); // 获取请求的唯一标识符
string key = "shouldstopstreaming_" + requestid; // 生成唯一的key
sseemitter emitter = new sseemitter();
bufferedreader bufferedreader = new bufferedreader(new inputstreamreader(/*输入流*/));
// sse输出逻辑
string line;
while ((line = bufferedreader.readline()) != null) {
boolean shouldstop = (boolean) redistemplate.opsforvalue().get(key);
if (shouldstop != null && shouldstop) {
break; // 检查shouldstopstreaming标志,若为true则中断循环
}
// 发送数据给客户端
emitter.send(line);
}
// 删除key,确保不再需要该key时将其移除
redistemplate.delete(key);
return emitter;
}
@requestmapping("/stopstreaming")
@responsebody
public string stopstreaming(httpservletrequest request) {
string requestid = request.getid(); // 获取请求的唯一标识符
string key = "shouldstopstreaming_" + requestid; // 生成唯一的key
// 设置shouldstopstreaming为true,终止流式输出
redistemplate.opsforvalue().set(key, true, 1, timeunit.hours); // 设置过期时间为1小时(可根据需要调整)
return "streaming stopped";
}
}a接口定期从redis中获取shouldstopstreaming的值,并检查是否应该中止流式输出。b接口使用redistemplate将shouldstopstreaming的值设置为true,以指示a接口中止输出。由于redis的操作是原子性的,并且redistemplate提供了线程安全的访问,这样可以确保多个线程之间的协调和线程安全性。
方案二:使用本地缓存,结合sseemitter特性实现(实际使用的此种方案)
private final map<string, sseemitter> ssecache = new concurrenthashmap<>(10);
## 对话接口中put一下前端随机生成的不唯一emitterid
ssecache.put(emitterid, emitter);
## 停止回答接口
@override
public void stop(string emitterid) {
if (ssecache.containskey(emitterid)) {
ssecache.get(emitterid).complete();
ssecache.remove(emitterid);
}
}到此这篇关于springboot sseemitter流式输出 的文章就介绍到这了,更多相关springboot sseemitter流式输出 内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论