使用python接入腾讯云DeepSeek

news2025/2/26 10:28:01

本文主要从提供SSE方式接入DeepSeek,并通过fastapi websocket对外提供接入方法。
参考文档:
腾讯云大模型:https://cloud.tencent.com/document/product/1759/109380
fastAPI官网:https://fastapi.tiangolo.com/

WebSocketManager

提供WebsocketManager对websocket连接实例进行管理,代码片段如下:

from fastapi import WebSocket


class WsManager:
    """
    websocket manager
    """
    connectors: dict[str, WebSocket] = {}

    @staticmethod
    def get_connector(connector_id: str) -> WebSocket | None:
        """
        获取连接实例
        :param connector_id:
        :return:
        """
        connector = WsManager.connectors.get(connector_id)
        return connector

    @staticmethod
    def add_connector(connector_id: str, connector: WebSocket):
        """
        添加websocket客户端
        :param connector_id: 客户端ID
        :param connector: 客户端连接实例
        :return:
        """
        WsManager.connectors[connector_id] = connector

    @staticmethod
    def remove_connector(connector_id: str):
        """
        移除websocket连接
        :param connector_id: 连接ID
        :return:
        """
        try:
            del WsManager.connectors[connector_id]
        except Exception:
            pass

    @staticmethod
    async def send_message(connector_id: str, message: str):
        connector = WsManager.connectors.get(connector_id)
        if connector is None:
            return
        try:
            await connector.send_text(message)
        except Exception as e:
            print('消息发送失败')

    @staticmethod
    async def send_message_json(connector_id: str, message: dict):
        connector = WsManager.connectors.get(connector_id)
        if connector is None:
            return
        try:
            await connector.send_json(message)
        except Exception as e:
            print('消息发送失败')

接入DeepSeek

import uuid
import os
import requests
import json

from src.core.WsManager import WsManager
from .session import get_session

TCLOUD_URL = 'lke.tencentcloudapi.com'
TCLOUD_DeepSeek_SSE_URL = "https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse"

SECRET_ID = "XXXXXXXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXX"
REGION = "ap-guangzhou"
TYPE = 5


def get_session():
    """
    生成一个 UUID
    :return session id 字符串
    """
    new_uuid = uuid.uuid4()
    # 将 UUID 转换为字符串
    session_id = str(new_uuid)
    return session_id


def _resolve_message(message: str, prev_message: str):
    """
    处理消息
    :param message:
    :param prev_message:
    :return:
    """
    if prev_message == '':
        return None
    if prev_message.startswith("event:"):
        # 生成消息
        message = message.replace("data:", '').strip()
        try:
            message_json = json.loads(message)
            message_type = message_json.get('type')
            payload = message_json.get('payload')
            if message_type == 'token_stat':
                # token统计事件
                return None
            elif message_type == 'thought':
                # 思考事件
                return None
            elif message_type == 'error':
                # 错误事件
                return None
            elif message_type == 'reference':
                # 参考来源事件
                return None
            content = payload.get('content')
            record_id = payload.get('record_id')
            request_id = payload.get('request_id')
            session_id = payload.get('session_id')
            trace_id = payload.get('trace_id')
            message_id = payload.get('message_id')
            return {
                'type': message_type,
                'data': {
                    'content': content,
                    'record_id': record_id,
                    'request_id': request_id,
                    'session_id': session_id,
                    'trace_id': trace_id,
                    'message_id': message_id,
                }
            }
        except Exception as e:
            print(e)
    else:
        return None


async def get_q_a_result(visitor_id: str, question: str):
    """
    获取问答结果
    :param visitor_id: 访客ID
    :param question: 问题内容
    :return: 回答内容
    """
    session_id = get_session()
    req_data = {
        "content": f"{question}",
        "bot_app_key": "XXXXXXXX",  # 可以获取方式见下图
        "visitor_biz_id": visitor_id,
        "session_id": session_id,
        "streaming_throttle": 100
    }
    res = requests.post(TCLOUD_DeepSeek_SSE_URL, data=json.dumps(req_data),
                        stream=True, headers={"Accept": "text/event-stream"})
    prev_message: str = ''  # 上一条消息
    if res.status_code == 200:
        for line in res.iter_lines():
            if line:
                data = line.decode('utf-8')
                message = _resolve_message(data, prev_message)
                if message:
                    await WsManager.send_message_json(visitor_id, message)
                prev_message = data
    else:
        print('Failed to get data. Status code: {response.status_code}')
    # 关闭websocket连接
    connector = WsManager.get_connector(visitor_id)
    if connector:
        await connector.close()
        WsManager.remove_connector(visitor_id)

  1. 进入控制台
    在这里插入图片描述2. 创建应用,并点击调用
    在这里插入图片描述

通过fastapi 以websocket方式对外提供接口

这个接口是,前台连接websocket后,后台直接根据连接参数,开始调用问答,问答结束后自动关闭websocket连接

import asyncio

from fastapi import APIRouter, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponse

from src.core.WsManager import WsManager
from src.utils.tcloudLLM import get_example_recommend
from src.utils.session import get_visitor_id

router = APIRouter(prefix="/api/tcloud", tags=['腾信云大模型接口'])


@router.websocket("/ws/{compound_name}")
async def get_exercises_ws(compound_name: str, websocket: WebSocket, background_tasks: BackgroundTasks):
    await websocket.accept()
    visitor_id = get_visitor_id()
    WsManager.add_connector(visitor_id, websocket)
    # 本来使用BackgroundTask去做后台任务,结果不知道什么原因,调用不起来,然后换成asyncio处理
    asyncio.create_task(get_example_recommend(visitor_id, compound_name))  # 创建对应问答任务
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        WsManager.remove_connector(visitor_id)
    except Exception:
        WsManager.remove_connector(visitor_id)

如果需要进行多轮问答,提供为多次问答设置相关的 request_id 参数进行消息串联。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2306297.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无法打开数据库 CAUsers\Public\EPLAN(Data\翻译\Company name\Translate.mdb。

eplan生成更新列表后报错,报错内容如下: 无法打开数据库 CAUsers\Public\EPLAN(Data\翻译\Company name\Translate.mdb。针对 64 位版本的EPLAN平台需要使用64 位版本的Microsoft Office。 原因:eplan的列表更新需要64位的微软办公软件版本支…

将CUBE或3DL LUT转换为PNG图像

概述 在大部分情况下,LUT 文件通常为 CUBE 或 3DL 格式。但是我们在 OpenGL Shader 中使用的LUT,通常是图像格式的 LUT 文件。下面,我将教大家如何将这些文件转换为 PNG 图像格式。 条形LUT在线转换(不是8x8网络)&am…

C语言(13)------------>do-while循环

1.do-while循环的语法 我们知道C语言有三大结构,顺序、选择、循环。我们可以使用while循环、for循环、do-while循环实现循环结构。之前的博客中提及到了前两者的技术实现。可以参考: C语言(11)------------->while循…

FS800DTU联动OneNET平台数据可视化View

目录 1 前言 2 环境搭建 2.1 硬件准备 2.2 软件环境 2.3 硬件连接 3 注册OneNET云平台并建立物模型 3.1 参数获取 3.2 连接OneNET 3.3上报数据 4 数据可视化View 4.1 用户信息获取 4.2 启用数据可视化View 4.3 创建项目 4.4 编辑项目 4.5 新增数据源 4.6 数据过滤器配置 4.6 项…

Linux 第三次脚本作业

源码编译安装httpd 2.4,提供系统服务管理脚本并测试(建议两种方法实现) 一、第一种方法 1、把 httpd-2.4.63.tar.gz 这个安装包上传到你的试验机上 2、 安装编译工具 (俺之前已经装好了) 3、解压httpd包 4、解压后的httpd包的文…

[数据结构笔记]数据结构必要的C语言基础

数据结构必要的C语言基础 使用C语言学习数据结构之前有一些必要了解的基础,许多同学在初学数据结构时因为对这些知识不熟,导致了对数据结构的畏惧心理。实际上很大一部分来自C语言的基础 C语言 结构体与指针 ​ 在一些场景中,如果传递给函…

数据结构笔记——06树和二叉树

文章目录 一、树的基本概念1.树的定义2.树的逻辑表示方法3.树的基本术语4.树的性质5.树的基本运算6.树的存储结构1)双亲存储结构2)孩子链存储结构3)孩子兄弟链存储结构 二、二叉树的概念和性质1.二叉树的定义2.二叉树的性质3.二叉树与树、森林之间的转换1)森林、树转换为二叉树…

蓝桥杯之日期题

文章目录 1.蓝桥杯必备知识点2. 题型13.需求2 1.蓝桥杯必备知识点 蓝桥杯是一个面向全国高校计算机相关专业学生的学科竞赛,涵盖多个赛道,常见的有软件类(如 C/C 程序设计、Java 软件开发、Python 程序设计)和电子类(…

PV Elite 27是专业的压力容器和热交换器设计解决方案

Intergraph PV Elite 27是专业的压力容器和热交换器设计解决方案。提供完整的容器和热交换器的设计,分析和评估解决方案。提供的完整的容器设计和分析、交换器设计和分析、管板设计和分析、矩形和非圆形容器分析、单个组件分析、综合误差检查、鞍座/支腿/吊耳/耳轴和…

Visual Studio 中的 /MD 与 /MT、动态库与静态库的深入解析

文章目录 1. /MD 与 /MT 的区别1.3 调试版本1.4 注意事项 2. 动态库与静态库的联系与区别2.3 联系与区别 3. 结合你的错误分析3.1 错误原因3.2 解决方案3.3 经验教训 4. 总结 在 Visual Studio 中进行 C/C 项目开发时,开发者经常需要对运行时库选项(例如…

QT入门--QMainWindow

从上向下依次是菜单栏,工具栏,铆接部件(浮动窗口),状态栏,中心部件 菜单栏 创建菜单栏 QMenuBar* mybar1 menuBar(); 将菜单栏放到窗口中 setMenuBar(mybar1); 创建菜单 QMenu *myfilemenu mybar1-…

深圳南柯电子|医疗设备EMC测试整改检测:零到一,保障医疗安全

在当今医疗科技飞速发展的时代,医疗设备的电磁兼容性(EMC)已成为确保其安全、有效运行的关键要素之一。EMC测试整改检测不仅关乎设备的性能稳定性,更是保障患者安全、避免电磁干扰引发医疗事故的重要措施。 一、医疗设备EMC测试整…

【链 表】

【链表】 一级目录1. 基本概念2. 算法分析2.1 时间复杂度2.2 空间复杂度2.3 时空复杂度互换 线性表的概念线性表的举例顺序表的基本概念顺序表的基本操作1. 初始化2. 插入操作3. 删除操作4. 查找操作5. 遍历操作 顺序表的优缺点总结优点缺点 树形结构图形结构单链表基本概念链表…

一周学会Flask3 Python Web开发-Jinja2模板过滤器使用

锋哥原创的Flask3 Python Web开发 Flask3视频教程: 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 在Jinja2中,过滤器(filter)是一些可以用来修改和过滤变量值的特殊函数,过滤器和变量用一个竖线 | &a…

【STM32H743IIT6】STM32H7的ADC时钟频率设置问题 —— 网上大多文章未注意到的要点!

前言 我使用的是定时器触发ADC采样。最近在想达到ADC的最高采样率的时候,发现一直却卡在1Msps上不去,直到在硬汉嵌入式的论坛里才发现了答案:[ADC] STM32H743/H750的Y版和V版芯片ADC的主频区别 这篇文章就详细的讲一下这个问题,这…

JavaScript基础(函数及面向对象)

函数 定义函数 Java定义方法: public 返回值类型 方法名(){ return 返回值 } 定义函数方法一 eg:定义一个绝对值函数 function abs(x) {if (x>0){return x;}else {return -x;}} 调用函数: 注意:一旦执行到return代表函数…

2025面试Go真题第一场

前几天参加了一场面试,GoLang 后端工程师,他们直接给了我 10 道题,我留了一个截图。 在看答案之前,你可以先简单做一下,下面我会对每个题目做一个说明。 文章目录 1、golang map 是否并发安全?2、协程泄漏的原因可能是…

【有奖实践】轻量消息队列(原 MNS)订阅 OSS 事件实时处理文件变动

当你需要对对象存储 OSS(Object Storage Service)中的文件变动进行实时处理、同步、监听、业务触发、日志记录等操作时, 你可以通过设置 OSS 的事件通知规则,自定义关注的文件,并将 OSS 事件推送到轻量消息队列&#x…

关于Postman自动获取token

在使用postman测试联调接口时,可能每个接口都需要使用此接口生成的令牌做Authorization的Bearer Token验证,最直接的办法可能会是一步一步的点击,如下图: 在Authorization中去选择Bearer Token,然后将获取到的token粘贴…

Baklib知识中台构建企业智慧中枢

智能技术架构构建路径 Baklib知识中台的技术架构设计以模块化和可扩展性为核心,通过分层解耦的架构体系实现知识管理的全流程覆盖。底层依托智能语义分析引擎与多模态知识图谱,完成非结构化数据的自动清洗与语义关联;中间层构建统一的知识资…