当前位置: 代码网 > it编程>编程语言>Java > 百度文心一言 java 支持流式输出,Springboot+ sse的demo

百度文心一言 java 支持流式输出,Springboot+ sse的demo

2024年08月05日 Java 我要评论
和其他的接口不一样 需要 CompletionsResponse.data 封装下 ,不然前端页面需要兼容非json的格式。3、WenxinEventSourceListener 事件监听器。2、配置apikey和secretkey。4、返回的json格式。

参考:github - mmciel/wenxin-api-java: 百度文心一言java库,支持问答和对话,支持流式输出和同步输出。提供springboot调用样例。提供拓展能力。

1、依赖

<dependency>
<groupid>com.baidu.aip</groupid>
<artifactid>java-sdk</artifactid>
<version>4.16.18</version>
</dependency>

2、配置apikey和secretkey

3、主要使用的接口

4、返回的json格式 

3、wenxineventsourcelistener  事件监听器

和其他的接口不一样 需要 completionsresponse.data  封装下 ,不然前端页面需要兼容非json的格式

@slf4j
public class wenxineventsourcelistener extends eventsourcelistener {

    private long tokens;

    private sseemitter sseemitter;

    public wenxineventsourcelistener(sseemitter sseemitter) {
        this.sseemitter = sseemitter;
    }

    @override
    public void onopen(eventsource eventsource, response response) {
        log.info("建立sse连接...");
    }

    @sneakythrows
    @override
    @jsonignoreproperties(ignoreunknown = true)
    public void onevent(eventsource eventsource, string id, string type, string data) {
        chatresponse bean = jsonutil.parseobj(data).tobean(chatresponse.class);
        log.info("返回数据:{}", data);
        if (bean.getis_end()) {
            log.info("返回数据结束了");
            sseemitter.send(sseemitter.event()
                    .id("[tokens]")
                    .data("<br/><br/>tokens:" + tokens())
                    .reconnecttime(3000));
            sseemitter.send(sseemitter.event()
                    .id("[done]")
                    .data("[done]")
                    .reconnecttime(3000));
            // 传输完成后自动关闭sse
            sseemitter.complete();
            return;
        }
        log.info("openai返回数据:{}", data);
        tokens += 1;
        if (data.equals("[done]")) {
            log.info("openai返回数据结束了");
            sseemitter.send(sseemitter.event()
                    .id("[tokens]")
                    .data("<br/><br/>tokens:" + tokens())
                    .reconnecttime(3000));
            sseemitter.send(sseemitter.event()
                    .id("[done]")
                    .data("[done]")
                    .reconnecttime(3000));
            // 传输完成后自动关闭sse
            sseemitter.complete();
            return;
        }

        completionsresponse completionresponse = new completionsresponse();
        completionsresponse.data dataresult = new completionsresponse.data();
        dataresult.settext(bean.getresult());

        completionresponse.setdata(dataresult);
        try {
            sseemitter.send(sseemitter.event()
                    .id(bean.getid())
                    .data(completionresponse.getdata())
                    .reconnecttime(3000));
        } catch (exception e) {
            log.error("sse信息推送失败!");
            eventsource.cancel();
            e.printstacktrace();
        }
    }

    @override
    public void onclosed(eventsource eventsource) {
        log.info("关闭sse连接...");
    }

    @sneakythrows
    @override
    public void onfailure(eventsource eventsource, throwable t, response response) {
        if(objects.isnull(response)){
            log.error("sse连接异常:{}", t);
            eventsource.cancel();
            return;
        }
        responsebody body = response.body();
        if (objects.nonnull(body)) {
            // 错误处理 {"error_code":110,"error_msg":"access token invalid or no longer valid"},异常:{}
            log.error("sse连接异常data:{},异常:{}", body.string(), t);
        } else {
            log.error("sse连接异常data:{},异常:{}", response, t);
        }
        eventsource.cancel();
    }

    /**
     * tokens
     * @return
     */
    public long tokens() {
        return tokens;
    }
}

4、wenxinclient  流式主要看下 streamchat 方式,之前从千帆上找到流式例子 返回type是json的,所以之前自己手写的demo总报异常。

 public void streamchat(chatbody chatbody, eventsourcelistener eventsourcelistener, modele modele) {
        if (objects.isnull(eventsourcelistener)) {
            throw new wenxinexception("参数异常:eventsourcelistener不能为空");
        }
        chatbody.setstream(true);
        try {
            eventsource.factory factory = eventsources.createfactory(this.okhttpclient);
            request request = new request.builder().url(assembleurl(modele))
                    .post(requestbody.create(mediatype.parse(contenttype.json.getvalue()),
                            new objectmapper().writevalueasstring(chatbody))).build();
            factory.neweventsource(request, eventsourcelistener);
        } catch (exception e) {
            log.error("请求参数解析异常:", e);
            e.printstacktrace();
        }
    }

private string assembleurl(modele modele) {
        accesstoken = wenxinconfig.refreshaccesstoken();
        return modele.getapihost() + "?access_token=" + accesstoken;
    }

5、定义sse的接口是实现方法

public interface sseservice {
    /**
     * 创建sse
     * @param uid
     * @return
     */
    sseemitter createsse(string uid);

    /**
     * 关闭sse
     * @param uid
     */
    void closesse(string uid);

    /**
     * 客户端发送消息到服务端
     * @param uid
     * @param chatrequest
     */
    chatresponse ssechat(string uid, chatrequest chatrequest);
}
public class wenxinsseserviceimpl implements sseservice {
    @value("${chat.accesskeyid}")
    private string accesskeyid;
    @value("${chat.accesskeysecret}")
    private string accesskeysecret;
    @value("${chat.agentkey}")
    private string agentkey;
    @value("${chat.appid}")
    private string appid;

    @autowired
    wenxinclient wenxinclient;
    @override
    public sseemitter createsse(string uid) {
        //默认30秒超时,设置为0l则永不超时
        sseemitter sseemitter = new sseemitter(0l);
        //完成后回调
        sseemitter.oncompletion(() -> {
            log.info("[{}]结束连接...................", uid);
            localcache.cache.remove(uid);
        });
        //超时回调
        sseemitter.ontimeout(() -> {
            log.info("[{}]连接超时...................", uid);
        });
        //异常回调
        sseemitter.onerror(
                throwable -> {
                    try {
                        log.info("[{}]连接异常,{}", uid, throwable.tostring());
                        sseemitter.send(sseemitter.event()
                                .id(uid)
                                .name("发生异常!")
                                .data(message.builder().content("发生异常请重试!").build())
                                .reconnecttime(3000));
                        localcache.cache.put(uid, sseemitter);
                    } catch (ioexception e) {
                        e.printstacktrace();
                    }
                }
        );
        try {
            sseemitter.send(sseemitter.event().reconnecttime(5000));
        } catch (ioexception e) {
            e.printstacktrace();
        }
        localcache.cache.put(uid, sseemitter);
        log.info("[{}]创建sse连接成功!", uid);
        return sseemitter;
    }

    @override
    public void closesse(string uid) {
        sseemitter sse = (sseemitter) localcache.cache.get(uid);
        if (sse != null) {
            sse.complete();
            //移除
            localcache.cache.remove(uid);
        }
    }

    @override
    public chatresponse ssechat(string uid, chatrequest chatrequest) {

        if (stringutils.isblank(chatrequest.getmsg())) {
            log.error("参数异常,msg为null", uid);
            throw new baseexception("参数异常,msg不能为空~");
        }

        sseemitter sseemitter = (sseemitter) localcache.cache.get(uid);

        if (sseemitter == null) {
            log.info("聊天消息推送失败uid:[{}],没有创建连接,请重试。", uid);
            throw new baseexception("聊天消息推送失败uid:[{}],没有创建连接,请重试。~");
        }

        wenxineventsourcelistener openaieventsourcelistener = new wenxineventsourcelistener(sseemitter);

        list<messageitem> messages = new arraylist<>();
        messages.add(messageitem.builder().role(messageitem.role.user).content(chatrequest.getmsg()).build());
        wenxinclient.streamchat(messages, openaieventsourcelistener, modele.ernie_bot);


        localcache.cache.put("msg" + uid, jsonutil.tojsonstr(messages), localcache.timeout);

        chatresponse response = new chatresponse();
        response.setquestiontokens(1);

        return response;
    }
}

6、主要的controller接口

/**
     * 创建sse连接
     *
     * @param headers
     * @return
     */
    @crossorigin
    @getmapping("/createsse")
    public sseemitter createconnect(@requestheader map<string, string> headers) {
        string uid = getuid(headers);
        return sseservice.createsse(uid);
    }

    /**
     * 聊天接口
     *
     * @param chatrequest
     * @param headers
     */
    @crossorigin
    @postmapping("/chat")
    @responsebody
    public chatresponse ssechat(@requestbody chatrequest chatrequest, @requestheader map<string, string> headers, httpservletresponse response) {
        string uid = getuid(headers);
        return sseservice.ssechat(uid, chatrequest);
    }

    /**
     * 关闭连接
     *
     * @param headers
     */
    @crossorigin
    @getmapping("/closesse")
    public void closeconnect(@requestheader map<string, string> headers) {
        string uid = getuid(headers);
        sseservice.closesse(uid);
    }

7、主要的页面代码

<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>智能问答</title>
  <link rel="stylesheet" href="styles.css"> <!-- 引入外部css -->
  <script src="hzrecorder.js"></script>
  <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.min.js"></script>
  <script src="js/markdown.min.js"></script>
  <script src="js/eventsource.min.js"></script>
  <script>

      function settext(text, uuid_str) {
        let content = document.getelementbyid(uuid_str);
        content.innerhtml = marked(text);
      }

      function uuid() {
        var s = [];
        var hexdigits = "0123456789abcdef";
        for (var i = 0; i < 36; i++) {
          s[i] = hexdigits.substr(math.floor(math.random() * 0x10), 1);
        }
        s[14] = "4"; // bits 12-15 of the time_hi_and_version field to 0010
        s[19] = hexdigits.substr((s[19] & 0x3) | 0x8, 1); // bits 6-7 of the clock_seq_hi_and_reserved to 01
        s[8] = s[13] = s[18] = s[23] = "-";

        var uuid = s.join("");
        console.log(uuid)
        return uuid;

      }



      window.onload = function () {
        /*let disconnectbtn = document.getelementbyid("disconnectsse");*/
        let messageelement = document.getelementbyid("messageinput");
        let chat = document.getelementbyid("chat-messages");
        let sse;
        let uid = window.localstorage.getitem("uid");
        if (uid == null || uid == "" || uid == "null") {
          uid = uuid();
        }
        let text = "";
        let uuid_str;
        // 设置本地存储
        window.localstorage.setitem("uid", uid);

        // 发送消息按钮点击事件
        document.getelementbyid('sendtextbutton').addeventlistener('click', async function () {
          try {
            const userinput = document.getelementbyid('messageinput').value.trim();
            if (userinput) {
              await sseoneturn(userinput)
              userinput.value = ''; // 清空输入框

            } else {
              alert('请输入文字消息!');
            }
          } catch (error) {
            alert('发送消息时发生错误: ' + error.message);
          }
        });

        // 回车事件
        messageelement.onkeydown = function () {
          if (window.event.keycode === 13) {
            if (!messageelement.value) {
              return;
            }
            sseoneturn(messageelement.value);
          }
        };

        function sseoneturn(inputtext) {
          uuid_str = uuid();
          //创建sse
          const eventsource = new eventsourcepolyfill("/createsse", {
            headers: {
              uid: uid,
            },
          });

          eventsource.onopen = (event) => {
            console.log("开始输出后端返回值");
            sse = event.target;
          };
          eventsource.onmessage = (event) => {
            debugger;
            if (event.lasteventid == "[tokens]") {
              text = text + event.data;
              settext(text, uuid_str);
              text = "";
              return;
            }
            if (event.data == "[done]") {
              text = "";
              if (sse) {
                sse.close();
              }
              return;
            }
            let json_data = json.parse(event.data);
            console.log(json_data);
            if (json_data.text == null || json_data.text == "null") {
              return;
            }
            text = text + json_data.text;
            settext(text, uuid_str);
          };
          eventsource.onerror = (event) => {
            console.log("onerror", event);
            alert("服务异常请重试并联系开发者!");
            if (event.readystate === eventsource.closed) {
              console.log("connection is closed");
            } else {
              console.log("error occured", event);
            }
            event.target.close();
          };
          eventsource.addeventlistener("customeventname", (event) => {
            console.log("message id is " + event.lasteventid);
          });
          eventsource.addeventlistener("customeventname", (event) => {
            console.log("message id is " + event.lasteventid);
          });
          $.ajax({
            type: "post",
            url: "/chat",
            data: json.stringify({
              msg: inputtext,
            }),
            contenttype: "application/json;charset=utf-8",
            datatype: "json",
            headers: {
              uid: uid,
            },
            beforesend: function (request) {},
            success: function (result) {
              //新增问题框
              debugger;
              chat.innerhtml +=
                      '<tr><td style="height: 30px;">' +
                      inputtext +
                      "<br/><br/> tokens:" +
                      result.question_tokens +
                      "</td></tr>";
              inputtext = null;
              //新增答案框
              chat.innerhtml +=
                      '<tr><td><article id="' +
                      uuid_str +
                      '" class="markdown-body"></article></td></tr>';
            },
            complete: function () {},
            error: function () {
              console.info("发送问题失败!");
            },
          });
        }

        /*disconnectbtn.onclick = function () {
          if (sse) {
            sse.close();
          }
        };*/
      };
    </script>
  </head>
<body>

<div class="chat-container">
  <div class="chat-header">
    <h1>智能问答</h1>
  </div>
  <div class="chat-messages" id="chat-messages">
    <!-- 聊天消息将会在这里显示 -->
  </div>
  <form class="message-form" onsubmit="return false;">
    <input type="text" id="messageinput" placeholder="输入消息..." autocomplete="off">
    <button type="button" id="sendtextbutton">发送文字</button>
    <button type="button" id="recordanduploadbutton">按住录音</button>
    <progress id="uploadprogress" value="0" max="100" style="display:none;"></progress>
  </form>
</div>

</body>

</html>

最后的呈现效果如下:

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com