问题描述
调用sse流式接口使用httpx_sse的方式
import httpx from httpx_sse import connect_sse # 省略无关代码 try: with httpx.client() as client: with connect_sse(client, "get", url, params=param) as event_source: clear_textbox(response_textbox) # 把 iter_sse() 迭代完, 就相当于处理完了一次流式调用 for sse in event_source.iter_sse(): # 流式响应中,每次响应体的处理逻辑 print(f"generated_answer的值是: '{sse.data}'") response = sse.data if response != '': # self.response = response append_text(response_textbox, response) except httpx.requesterror as e: print(f"请求错误:{e}") except exception as e: print(f"发生了一个错误:{e}")
httpx_sse的connet_sse源码:
@contextmanager def connect_sse( client: httpx.client, method: str, url: str, **kwargs: any ) -> iterator[eventsource]: headers = kwargs.pop("headers", {}) headers["accept"] = "text/event-stream" headers["cache-control"] = "no-store" with client.stream(method, url, headers=headers, **kwargs) as response: yield eventsource(response)
可以看到connect_sse源码中的headers的"accept"设置了只接受"text/event-stream"流式结果,正常这么调用是没错的。但是当后端的流式接口因为401权限问题等报错返回了"application/json"格式,如
{ “code”:401, “msg”:“登录过期,请重新登录”, “data”:null} 这样的json格式结果时,以上代码就会报错,因为他不是"text/event-stream"流式响应结果头。那么该怎么办呢?
方案
重新写一个自定义的connect_sse。
import httpx from httpx_sse import eventsource from typing import any, iterator from contextlib import contextmanager import json # 自定义调用sse接口 @contextmanager def custom_connect_sse( self, client: httpx.client, method: str, url: str, **kwargs: any ) -> iterator[eventsource]: headers = kwargs.pop("headers", {}) # 只有当没有指定accept时才添加默认值 headers["accept"] = "*/*" headers["cache-control"] = "no-store" with client.stream(method, url, headers=headers, **kwargs) as response: content_type = response.headers.get('content-type', '').lower() json_flag = false if 'text/event-stream' in content_type: # 处理sse流 yield json_flag, eventsource(response) elif 'application/json' in content_type: # yield response # 在这里你可以决定如何进一步处理这个json响应 # 读取并合并所有文本块 text_data = ''.join([chunk for chunk in response.iter_text()]) # 解析整个响应体为json json_data = json.loads(text_data) json_flag = true yield json_flag, json_data
调用代码
# 使用自定义的connect_sse函数 try: with httpx.client() as client: with self.custom_connect_sse(client, "get", url, params=param, headers=headers) as (json_flag, event_source): if json_flag: code = event_source.get("code") msg = event_source.get("msg") print(f"code: [code], message: {msg}") else: full_answer = "" clear_textbox(response_textbox) for sse in event_source.iter_sse(): print(f"generated_answer的值是: '{sse.data}'") response = sse.data if response: append_text(response_textbox, response) full_answer += response user_record += reply + full_answer + "\n" print(f"user_record:{user_record}") except httpx.requesterror as e: print(f"请求错误:{e}") except exception as e: print(f"发生了一个错误:{e}")
关键步骤:
1.设置headers[“accept”] = “/”,所有响应头都可以接收
2.content_type = response.headers.get(‘content-type’, ‘’).lower() 判断响应头是流式还是json,并用json_flag记录是否json标识,返回不同的结果。如果是json,则循环合并处理chunk块,拼装完整json返回结果(实测第一次就返回完整json结构了,但是代码得这么写)。
3.使用自定义connect_sse方法时,根据json_flag来分别处理成功调用流式结果还是异常的json结果。
到此这篇关于python处理application/json错误的方法详解的文章就介绍到这了,更多相关python处理application/json错误内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论