开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

news2025/1/10 2:14:55

一、前言

    使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

    总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

    LangChain基础入门:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(一),本篇学习如何集成LangChain进行模型交互,并使用工具获取实时信息


二、术语

2.1.FastAPI

    FastAPI 是一个用于构建 API 的现代、快速(高性能)的 Python Web 框架。它是基于标准 Python 类型注释的 ASGI (Asynchronous Server Gateway Interface) 框架。

FastAPI 具有以下主要特点:

  1. 快速: FastAPI 使用 ASGI 服务器和 Starlette 框架,在性能测试中表现出色。它可以与 Uvicorn 一起使用,提供非常高的性能。

  2. 简单: FastAPI 利用 Python 类型注释,使 API 定义变得简单且直观。开发人员只需要定义输入和输出模型,FastAPI 会自动生成 API 文档。

  3. 现代: FastAPI 支持 OpenAPI 标准,可以自动生成 API 文档和交互式文档。它还支持 JSON Schema 和数据验证。

  4. 全功能: FastAPI 提供了路由、依赖注入、数据验证、安全性、测试等功能,是一个功能齐全的 Web 框架。

  5. 可扩展: FastAPI 被设计为可扩展的。开发人员可以轻松地集成其他库和组件,如数据库、身份验证等。

2.2.WebSocket

    是一种计算机通信协议,它提供了在单个 TCP 连接上进行全双工通信的机制。它是 HTML5 一个重要的组成部分。

WebSocket 协议主要有以下特点:

  1. 全双工通信:WebSocket 允许客户端和服务器之间进行双向实时通信,即数据可以同时在两个方向上流动。这与传统的 HTTP 请求-响应模型不同,HTTP 中数据只能单向流动。

  2. 持久性连接:WebSocket 连接是一种持久性的连接,一旦建立就会一直保持,直到客户端或服务器主动关闭连接。这与 HTTP 的连接是短暂的不同。

  3. 低开销:相比 HTTP 请求-响应模型,WebSocket 在建立连接时需要较少的数据交换,因此网络开销较小。

  4. 实时性:由于 WebSocket 连接是持久性的,且数据可以双向流动,因此 WebSocket 非常适用于需要实时、低延迟数据交互的应用场景,如聊天应用、实时游戏、股票行情等。

2.3.Tool

    Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。

2.4.langchain预置的tools

    https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

   基本这些工具能满足大部分需求,具体使用参见:


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加Google Search的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet  langchain-core langchain-community langchain-openai
pip install google-search-results

3.2. 注册Google Search API账号

1. 输入注册信息

可以使用Google账号登录,但仍要执行下面的认证操作

2. 需要认证邮箱

3. 需要认证手机

使用接码平台:https://sms-activate.io/cn

选择Google服务

注意

1. 购买的虚拟号码要与IP(科学上网)所在地一致,此处选择美国

2. 使用支付宝充值,最低需要充值2美金

4. 认证成功

3.3. 生成Google Search API的KEY


四、技术实现

4.1. Google Search小试

# -*- coding: utf-8 -*-
import os

from langchain_community.utilities.serpapi import SerpAPIWrapper

os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
serp = SerpAPIWrapper()
result = serp.run("广州的实时气温如何?")
print("实时搜索结果:", result)

调用结果:

4.2. 非流式输出

    本章代码将开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(三)基础上进行拓展

服务端:

import uvicorn
import os

from typing import Annotated
from fastapi import (
    Depends,
    FastAPI,
    WebSocket,
    WebSocketException,
    WebSocketDisconnect,
    status,
)
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapper

from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'    #你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"


class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

app = FastAPI()

async def authenticate(
    websocket: WebSocket,
    userid: str,
    secret: str,
):
    if userid is None or secret is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

    print(f'userid: {userid},secret: {secret}')
    if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
        return 'pass'
    else:
        return 'fail'

@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = SerpAPIWrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result


def get_prompt():
    template='''
    Respond to the human as helpfully and accurately as possible. You have access to the following tools:
    
    {tools}
    
    Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).
    
    Valid "action" values: "Final Answer" or {tool_names}
    
    Provide only ONE action per $JSON_BLOB, as shown:
    
    ```
    
    {{
    
      "action": $TOOL_NAME,
    
      "action_input": $INPUT
    
    }}
    
    ```
    
    Follow this format:
    
    Question: input question to answer
    
    Thought: consider previous and subsequent steps
    
    Action:
    
    ```
    
    $JSON_BLOB
    
    ```
    
    Observation: action result
    
    ... (repeat Thought/Action/Observation N times)
    
    Thought: I know what to respond
    
    Action:
    
    ```
    
    {{
    
      "action": "Final Answer",
    
      "action_input": "Final response to human"
    
    }}
    
    Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
    '''
    system_message_prompt = SystemMessagePromptTemplate.from_template(template)
    human_template='''
    {input}
    
    {agent_scratchpad}
    
     (reminder to respond in a JSON blob no matter what)
    '''
    human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
    prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

    return prompt

async def chat(query):
    global llm,tools
    agent = create_structured_chat_agent(
        llm, tools, get_prompt()
    )

    agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

    result = agent_executor.invoke({"input": query})
    print(result['output'])
    yield result['output']

@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
    await manager.connect(websocket)
    try:
        while True:
            text = await websocket.receive_text()

            if 'fail' == permission:
                await manager.send_personal_message(
                    f"authentication failed", websocket
                )
            else:
                if text is not None and len(text) > 0:
                    async for msg in chat(text):
                        await manager.send_personal_message(msg, websocket)

    except WebSocketDisconnect:
        manager.disconnect(websocket)
        print(f"Client #{userid} left the chat")
        await manager.broadcast(f"Client #{userid} left the chat")

if __name__ == '__main__':
    tools = [search]
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)
    uvicorn.run(app, host='0.0.0.0',port=7777)

客户端:

<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
            <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
            <br/>
            <button onclick="connect(event)">Connect</button>
            <hr>
            <label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = null;
            function connect(event) {
                var userid = document.getElementById("userid")
                var secret = document.getElementById("secret")
                ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
                ws.onmessage = function(event) {
                    var messages = document.getElementById('messages')
                    var message = document.createElement('li')
                    var content = document.createTextNode(event.data)
                    message.appendChild(content)
                    messages.appendChild(message)
                };
                event.preventDefault()
            }
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>

调用结果:

用户输入:你好

不需要触发工具调用

模型输出:你好!有什么我可以帮忙的吗?

用户输入:广州现在天气如何?

需要调用工具

模型输出:The current weather in Guangzhou is partly cloudy with a temperature of 95°F, 66% chance of precipitation, 58% humidity, and wind speed of 16 mph. This information was last updated on Monday at 1:00 PM.

PS:

1. 在AI交互中,LangChain框架并不是必须引入,此处引用仅用于简化Openai的交互流程。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

3. 目前还遗留两个问题,一是如何实现流式输出,二是如何更好维护prompt模版,篇幅有限,下回分解


五、附带说明

5.1. 如何避免模型用英文回复

在提示词模版加入:Remember to answer in Chinese.  暗示模型一定要以中文进行回复。

修改后的提示语为:

Respond to the human as helpfully and accurately as possible. You have access to the following tools:
    
    {tools}
    
    Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).
    
    Valid "action" values: "Final Answer" or {tool_names}
    
    Provide only ONE action per $JSON_BLOB, as shown:
    
    ```
    
    {{
    
      "action": $TOOL_NAME,
    
      "action_input": $INPUT
    
    }}
    
    ```
    
    Follow this format:
    
    Question: input question to answer
    
    Thought: consider previous and subsequent steps
    
    Action:
    
    ```
    
    $JSON_BLOB
    
    ```
    
    Observation: action result
    
    ... (repeat Thought/Action/Observation N times)
    
    Thought: I know what to respond
    
    Action:
    
    ```
    
    {{
    
      "action": "Final Answer",
    
      "action_input": "Final response to human"
    
    }}
    
    Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Remember to answer in Chinese.Format is Action:```$JSON_BLOB```then Observation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1884466.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

物联网工业级网关解决方案 工业4G路由器助力智慧生活

随着科技的飞速发展&#xff0c;无线通信技术正逐步改变我们的工作与生活。在这个智能互联的时代&#xff0c;一款高性能、稳定可靠的工业4G路由器成为了众多行业不可或缺的装备。工业4G路由器以其卓越的性能和多样化的功能&#xff0c;助力我们步入智慧新纪元。 一、快速转化&…

SpringBoot+ELK 收集日志的两种方式

方式一、FileBeatlogstash 7.5.1(docker)ES(docker)springboot 日志文件 应用方式 我们采用ELFK 架构采集日志&#xff0c;直接读取日志生成的文件&#xff0c;不对Springboot的日志任何的修改。也就是FileBeat 通过读取日志文件位置获取日志内容&#xff0c;然后发送至logsta…

综合项目实战--jenkins流水线

一、流水线定义 软件生产环节,如:需求调研、需求设计、概要设计、详细设计、编码、单元测试、集成测试、系统测试、用户验收测试、交付等,这些流程就组成一条完整的流水线。脚本式流水线(pipeline)的出现代表企业人员可以更自由的通过代码来实现不同的工作流程。 二、pi…

Flink 运行时架构

Flink 运行时的组件 作业管理器&#xff08;JobManager&#xff09;资源管理器&#xff08;ResourceManager&#xff09;任务管理器&#xff08;TaskManager&#xff09;分发器&#xff08;Dispatch&#xff09; JobManager 控制一个应用程序执行的主进程&#xff0c;也就是说…

IDEA 编译单个Java文件

文章目录 一、class文件的生成位置二、编译单个文件编译项目报错Error:java: 无效的源发行版: 8 一、class文件的生成位置 file->project structure->Modules 二、编译单个文件 选中文件&#xff0c;点击recompile 编译项目报错 Error:java: 无效的源发行版: 8 Fi…

从GPT到AGI:ChatGPT如何改变人机交互

在人工智能&#xff08;AI&#xff09;领域&#xff0c;ChatGPT等大语言模型&#xff08;LLM&#xff09;的出现&#xff0c;标志着一个新的时代。本文将深入探讨ChatGPT的技术原理、误解、潜在问题以及未来的发展方向和应用场景&#xff0c;并分析其对社会和商业领域的影响。 …

【Python数据分析及环境搭建】:教程详解1(第23天)

系列文章目录 Python进行数据分析的优势常用Python数据分析开源库介绍启动Jupyter服务Jupyter Notebook的使用 文章目录 系列文章目录前言学习目标1. Python进行数据分析的优势2. 常用Python数据分析开源库介绍2.1 NumPy2.2 Pandas2.3 Matplotlib2.4 Seaborn2.5 Sklearn2.6 Ju…

python 分析nginx的error.log日志 然后写入到 mongodb当中 并且解决mongodb无法根据id删除数据的问题

废话不多说 直接上代码 import re import os import pymongo import uuid import bson def extract_unresolved_info(log_path):unresolved_info []with open(log_path, r) as file:log_text file.read()lines log_text.split("\n")for line in lines:# 这种属于主…

汽车内饰塑料件光照老化实验箱

塑料件光照老化实验箱概述 塑料件光照老化实验箱&#xff0c;又称为氙灯老化试验箱&#xff0c;是一种模拟自然光照条件下塑料材料老化情况的实验设备。它通过内置的氙灯或其他光源&#xff0c;产生接近自然光的紫外线辐射&#xff0c;以此来加速塑料及其他材料的光老化过程。…

Open3D 点云CPD算法配准(粗配准)

目录 一、概述 二、代码实现 2.1关键函数 2.2完整代码 三、实现效果 3.1原始点云 3.2配准后点云 一、概述 在Open3D中&#xff0c;CPD&#xff08;Coherent Point Drift&#xff0c;一致性点漂移&#xff09;算法是一种经典的点云配准方法&#xff0c;适用于无序点云的非…

Python番外篇之责任转移:有关于虚拟机编程语言的往事

编程之痛 如果&#xff0c;你像笔者一样&#xff0c;有过学习或者使用汇编语言与C、C等语言的经历&#xff0c;一定对下面所说的痛苦感同身受。 汇编语言 将以二进制表示的一条条CPU的机器指令&#xff0c;以人类可读的方式进行表示。虽然&#xff0c;人类可读了&#xff0c…

Android Studio 2023版本切换DNK版本

选择自己需要的版本下载 根目录下的配置路劲注意切换 build.gradle文件下的ndkVersion也要配好对应版本

【web APIs】快速上手Day03

目录 Web APIs - 第3天全选文本框案例事件流事件捕获事件冒泡阻止冒泡解绑事件on事件方式解绑addEventListener方式解绑 注意事项-鼠标经过事件的区别两种注册事件的区别 事件委托综合案例-tab栏切换改造 其他事件页面加载事件元素滚动事件页面滚动事件-获取位置页面滚动事件-滚…

【java高级】【算法】通过子节点 反向获取 树路径父节点 且不获取无关节点

有一个奇葩需求 要求 用户配置在某选择框的选项 例如 然后在选择时显示 用户配置的选项 依旧是返回树,但是只包含 选择的子节点。 以及涉及的父节点,树路径 不返回无关节点 【一般】我们开发中都是直接通过 树节点 返回 其下子节点 这个需求的确很奇葩。 而且还要考…

生命在于学习——Python人工智能原理(3.1.1)

Python部分结束了&#xff0c;开始概率论部分 一、概率基本知识 1.1 事件与概率 1.1.1 事件的运算与关系 &#xff08;一&#xff09;基本概念 定义1 随机试验 如果一个试验满足如下条件&#xff1a; 在试验前不能断定其将发生什么结果&#xff0c;但可明确指出或说明试验…

Python系统教程01

Python 是一门解释性语言&#xff0c;相对更简单、易学&#xff0c;它可以用于解决数学问题、获取与分 析数据、爬虫爬取网络数据、实现复制数学算法等等。 1、print()函数&#xff1a; print()书写时注意所有的符号都是英文符号。print()输出内容时&#xff0c;若要输出字符…

【RabbitMQ问题踩坑】RabbitMQ设置手动ack后,消息队列有多条消息,只能消费一条,就不继续消费了,这是为什么 ?

现象&#xff1a;我发送5条消息到MQ队列中&#xff0c;同时&#xff0c;我在yml中设置的是需要在代码中手动确认&#xff0c;但是我把代码中的手动ack给关闭了&#xff0c;会出现什么情况&#xff1f; yml中配置&#xff0c;配置需要在代码中手动去确认消费者消费消息成功&…

赋能心理大模型,景联文科技推出高质量心理大模型数据库

生成式大模型作为当前发展势头最为强劲的人工智能前沿技术&#xff0c;其在临床心理学领域中的创新应用已成为社会关注和医学聚焦的热点之一。 心理大模型在落地应用过程中可能面临的痛点主要包括以下几个方面&#xff1a; 数据隐私与安全&#xff1a;确保敏感的个人信息在模型…

uniapp微信小程序电子签名

先上效果图&#xff0c;不满意可以直接关闭这页签 新建成单独的组件&#xff0c;然后具体功能引入&#xff0c;具体功能点击签名按钮&#xff0c;把当前功能页面用样式隐藏掉&#xff0c;v-show和v-if也行&#xff0c;然后再把这个组件显示出来。 【签名-撤销】原理是之前绘画时…

全球首款搭载Google Gemini和GPT-4o的智能眼镜发布

智能眼镜仍然是一个尚未完全成熟的未来概念&#xff0c;但生成式人工智能的到来显著提升了这些设备的能力。Meta 的 Ray-Ban 智能眼镜被许多人视为当今最好的选择之一&#xff0c;而现在 Solos AirGo Vision 正在为其带来竞争&#xff0c;这款眼镜还集成了 Google Gemini 支持。…