python使用httpx_sse调用sse流式接口对响应格式为application/json的错误信息的处理

news2025/2/24 5:23:22

目录

  • 问题描述
  • 方案

问题描述

调用sse流式接口使用httpx_sse的方式

				import httpx
				from httpx_sse import connect_sse
				# 省略无关代码
				try:
                    with httpx.Client() as client:
                        with connect_sse(client, "GET", url, params=param) as event_source:
                            clear_textbox(response_textbox)
                            # 把 iter_sse() 迭代完, 就相当于处理完了一次流式调用
                            for sse in event_source.iter_sse():
                                # 流式响应中,每次响应体的处理逻辑
                                print(f"generated_answer的值是: '{sse.data}'")
                                response = sse.data
                                if response != '':
                                    # self.response = response
                                    append_text(response_textbox, response)

                except httpx.RequestError as e:
                    print(f"请求错误:{e}")
                except Exception as e:
                    print(f"发生了一个错误:{e}")

httpx_sse的connet_sse源码:

@contextmanager
def connect_sse(
    client: httpx.Client, method: str, url: str, **kwargs: Any
) -> Iterator[EventSource]:
    headers = kwargs.pop("headers", {})
    headers["Accept"] = "text/event-stream"
    headers["Cache-Control"] = "no-store"

    with client.stream(method, url, headers=headers, **kwargs) as response:
        yield EventSource(response)

可以看到connect_sse源码中的headers的"Accept"设置了只接受"text/event-stream"流式结果,正常这么调用是没错的。但是当后端的流式接口因为401权限问题等报错返回了"application/json"格式,如
{ “code”:401, “msg”:“登录过期,请重新登录”, “data”:null} 这样的json格式结果时,以上代码就会报错,因为他不是"text/event-stream"流式响应结果头。那么该怎么办呢?

方案

重新写一个自定义的connect_sse。

import httpx
from httpx_sse import EventSource
from typing import Any, Iterator
from contextlib import contextmanager
import json
# 自定义调用sse接口
    @contextmanager
    def custom_connect_sse(
            self, client: httpx.Client, method: str, url: str, **kwargs: Any
    ) -> Iterator[EventSource]:
        headers = kwargs.pop("headers", {})
        # 只有当没有指定Accept时才添加默认值
        headers["Accept"] = "*/*"
        headers["Cache-Control"] = "no-store"

        with client.stream(method, url, headers=headers, **kwargs) as response:
            content_type = response.headers.get('content-type', '').lower()
            json_flag = False
            if 'text/event-stream' in content_type:
                # 处理SSE流
                yield json_flag, EventSource(response)
            elif 'application/json' in content_type:
                # yield response  # 在这里你可以决定如何进一步处理这个JSON响应
                # 读取并合并所有文本块
                text_data = ''.join([chunk for chunk in response.iter_text()])
                # 解析整个响应体为JSON
                json_data = json.loads(text_data)
                json_flag = True
                yield json_flag, json_data

调用代码

# 使用自定义的connect_sse函数
                try:
                    with httpx.Client() as client:
                        with self.custom_connect_sse(client, "GET", url, params=param, headers=headers) as (json_flag, event_source):
                            if json_flag:
                                code = event_source.get("code")
                                msg = event_source.get("msg")
                                print(f"Code: {code}, Message: {msg}")
                            else:
                                full_answer = ""
                                clear_textbox(response_textbox)
                                for sse in event_source.iter_sse():
                                    print(f"generated_answer的值是: '{sse.data}'")
                                    response = sse.data
                                    if response:
                                        append_text(response_textbox, response)
                                        full_answer += response
                                user_record += reply + full_answer + "\n"
                                print(f"user_record:{user_record}")

                except httpx.RequestError as e:
                    print(f"请求错误:{e}")
                except Exception as e:
                    print(f"发生了一个错误:{e}")

关键步骤:
1.设置headers[“Accept”] = “/”,所有响应头都可以接收
2.content_type = response.headers.get(‘content-type’, ‘’).lower() 判断响应头是流式还是json,并用json_flag记录是否json标识,返回不同的结果。如果是json,则循环合并处理chunk块,拼装完整json返回结果(实测第一次就返回完整json结构了,但是代码得这么写)。
3.使用自定义connect_sse方法时,根据json_flag来分别处理成功调用流式结果还是异常的json结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2304203.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在列线图上标记做为线性模型的局部解释

改造列线图做为线性模型的解释 除了使用列线图算法产生的meta数据和score数据进行线性模型的解释(全局性解释和局部性解释),另外一种做法是改造列线图来作为线性模型的解释。这里尝试改造列线图来对线性模型进行全局性和局部性解释。 全局…

KubeKey一键安装部署k8s集群和KubeSphere详细教程

目录 一、KubeKey简介 二、k8s集群KubeSphere安装 集群规划 硬件要求 Kubernetes支持版本 操作系统要求 SSH免密登录 配置集群时钟 所有节点安装依赖 安装docker DNS要求 存储要求 下载 KubeKey 验证KubeKey 配置集群文件 安装集群 验证命令 登录页面 一、Ku…

车载诊断数据库 --- AUTOSAR诊断文件DEXT简介

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 简单,单纯,喜欢独处,独来独往,不易合同频过着接地气的生活,除了生存温饱问题之外,没有什么过多的欲望,表面看起来很高冷,内心热情,如果你身…

庙算兵棋推演AI开发初探(5-数据处理)

碎碎念:这最近几个月过得那叫一个难受,研究生开题没过、需求评审会在4月和6月开了2次、7月紧接着软件设计评审会,加班干得都是文档的事情,还有开会前的会务和乱七八糟的琐事,我们干的还被规定弄的束手束脚,…

【MyBatis】#{} 与 ${} 的区别(常见面试题)

目录 前言 预编译SQL和即时SQL 什么是预编译SQL? 什么是即时SQL? 区别 #{} 与 ${}的使用 防止SQL注入 什么是SQL注入? 原理 排序功能 模糊查询 总结#{}和${}的区别 前言 在前面的学习中,我们已经知道了如果SQL语句想…

鸿蒙开发环境搭建-入门篇

本文章讲述如何搭建鸿蒙应用开发环境:新建工程、虚拟机运行、真机调试等。 开发工具: DevEco Studio 5.0.3.906 os系统: mac 参考文档:https://juejin.cn/post/7356143704699699227 官网鸿蒙应用开发学习文档:https://developer.huawei.com/c…

iOS开发 网络安全

iOS开发中的网络安全 在当前的数字化时代,任何应用程序都需要重视网络安全。尤其是对于iOS应用开发者而言,确保应用与服务器之间的数据传输安全是至关重要的。接下来,我们将学习“iOS开发 网络安全”的实现过程。 流程步骤 以下是实现iOS网…

MATLAB在投资组合优化中的应用:从基础理论到实践

引言 投资组合优化是现代金融理论中的核心问题之一,旨在通过合理配置资产,实现风险与收益的最佳平衡。MATLAB凭借其强大的数学计算能力和丰富的金融工具箱,成为投资组合优化的理想工具。本文将详细介绍如何使用MATLAB进行投资组合优化&#…

银河麒麟系统安装mysql5.7【亲测可行】

一、安装环境 cpu:I5-10代; 主板:华硕; OS:银河麒麟V10(SP1)未激活 架构:Linux 5.10.0-9-generic x86_64 GNU/Linux mysql版本:mysql-5.7.34-linux-glibc2.12-x86_64.ta…

自动创建spring boot应用(eclipse版本)

使用spring starter project创建项目 设置Service URL 把Service URL设置为 https://start.aliyun.com/ 如下图: 使用这个网址,创建项目更快。 选择Spring Web依赖 项目结构 mvnw和mvnw.cmd:这是maven包装器(wrapper)脚本&…

基于Flask的第七次人口普查数据分析系统的设计与实现

【Flask】基于Flask的第七次人口普查数据分析系统的设计与实现(完整系统源码开发笔记详细部署教程)✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 基于Flask的人口普查可视化分析系统 二、项目界面展示 登录/注册 首页/详情 …

Linux:文件(三)

1. 磁盘 基本概念 机械磁盘在现在的计算机中基本是唯一的一个机械设备 速度较内存更慢,容量大价格便宜。 磁盘是永久性存储介质,断电后数据还在。 内存是易失性存储介质,断电后(未写入磁盘的)数据丢失。 物理存储结构 扇区:…

DeepSeek 给我一个 DeepSeekUI 页面

接着上次分享内容 三步安装 DeepSeek 说,DeepSeek 下载好了,总不能是黑框框对话吧,总得找一个 UI 界面使用吧。 本地运行 DeepSeek 比安装 python、jdk 简单多了,本地还没装过的可以参考上次的文档安装。 于是找了几个开源的试了试…

Java NIO与传统IO性能对比分析

Java NIO与传统IO性能对比分析 在Java中,I/O(输入输出)操作是开发中最常见的任务之一。传统的I/O方式基于阻塞模型,而Java NIO(New I/O)引入了非阻塞和基于通道(Channel)和缓冲区&a…

小智机器人CMakeLists编译文件解析

编译完成后,成功烧录! 这段代码是一个CMake脚本,用于配置和构建一个嵌入式项目,特别是针对ESP32系列芯片的项目。CMake是一个跨平台的构建系统,用于管理项目的编译过程。 set(SOURCES "audio_codecs/audio_code…

【科研绘图系列】R语言绘制SCI论文图合集

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载Load dataFigure 1Fig 1B: functional assays adhensionFIG 1C: Functional assays OPK Figure 2Fig 2C: Settings and function fo…

VSCode ssh远程连接内网服务器(不能上网的内网环境的Linux服务器)的终极解决方案

VSCode ssh远程连接内网服务器(不能上网的内网环境的Linux服务器) 离线下载vscode-server并安装: 如果远程端不能联网可以下载包离线安装,下载 vscode-server 的 url 需要和 vscode 客户端版本的 commit-id 对应.通过 vscode 面板的帮助->关于可以获…

支持向量机(SVM):算法讲解与原理推导

1 SVM介绍 SVM是一个二类分类器,它的全称是Support Vector Machine,即支持向量机。 SVM的目标是找到一个超平面,使用两类数据离这个超平面越远越好,从而对新的数据分类更准确,即使分类器更加健壮。比如上面的图中&am…

macos sequoia 禁用 ctrl+enter 打开鼠标右键菜单功能

macos sequoia默认ctrlenter会打开鼠标右键菜单,使得很多软件有冲突。关闭方法: end

Android14 Camera框架中Jpeg流buffer大小的计算

背景描述 Android13中,相机框架包含对AIDL Camera HAL的支持,在Android13或更高版本中添加的相机功能只能通过AIDL Camera HAL接口使用。 对于Android应用层来说,使用API34即以后版本的Camera应用程序通过Camera AIDL Interface访问到HAL层…