开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)

news2024/12/22 10:49:41

 一、前言

    使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

    总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

    本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何集成Tool获取实时数据,并以流式方式返回


二、术语

2.1.Tool

    Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。

2.2.langchain预置的tools

    https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

   基本这些工具能满足大部分需求,具体使用参见:

2.3.LangChain支持流式输出的方法

  • stream:基本的流式传输方式,能逐步给出代理的动作和观察结果。
  • astream:异步的流式传输,用于异步处理需求的情况。
  • astream_events:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。

2.4.langchainhub

    是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。

    它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。

2.5.asyncio

    是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加Google Search以及langchainhub的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet  langchain-core langchain-community langchain-openai
pip install google-search-results langchainhub

3.2. 注册Google Search API账号

参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

3.3. 生成Google Search API的KEY


四、技术实现

4.1. 使用Tool&流式输出

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"


llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)


@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = SerpAPIWrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result


tools = [search]

template='''
Respond to the human as helpfully and accurately as possible. You have access to the following tools:

{tools}

Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).

Valid "action" values: "Final Answer" or {tool_names}

Provide only ONE action per $JSON_BLOB, as shown:

```

{{

  "action": $TOOL_NAME,

  "action_input": $INPUT

}}

```

Follow this format:

Question: input question to answer

Thought: consider previous and subsequent steps

Action:

```

$JSON_BLOB

```

Observation: action result

... (repeat Thought/Action/Observation N times)

Thought: I know what to respond

Action:

```

{{

  "action": "Final Answer",

  "action_input": "Final response to human"

}}

Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
'''
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template='''
{input}

{agent_scratchpad}

 (reminder to respond in a JSON blob no matter what)
'''
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])


print(prompt)

agent = create_structured_chat_agent(
    llm, tools, prompt
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)


async def chat(params):
    events = agent_executor.astream_events(params,version="v2")
    async for event in events:
        type = event['event']
        if 'on_chat_model_stream' == type:
            data = event['data']
            chunk =  data['chunk']
            content =  chunk.content
            if content and len(content) > 0:
                print(content)



asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

说明:

流式输出的数据结构为:

{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
type: on_chat_model_stream
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}

4.2. 通过langchainhub使用公共prompt

   在4.1使用Tool&流式输出的代码基础上进行调整

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

from langchain import hub

llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)


@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = SerpAPIWrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result


tools = [search]

prompt = hub.pull("hwchase17/structured-chat-agent")

print(prompt)

agent = create_structured_chat_agent(
    llm, tools, prompt
)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)



async def chat(params):
    events = agent_executor.astream_events(params,version="v2")
    async for event in events:
        type = event['event']
        if 'on_chat_model_stream' == type:
            data = event['data']
            chunk =  data['chunk']
            content =  chunk.content
            if content and len(content) > 0:
                print(content)


asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

4.3. 整合代码

在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整

import uvicorn
import os

from typing import Annotated
from fastapi import (
    Depends,
    FastAPI,
    WebSocket,
    WebSocketException,
    WebSocketDisconnect,
    status,
)
from langchain import hub
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapper

from langchain_core.tools import tool
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"


class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

app = FastAPI()

async def authenticate(
    websocket: WebSocket,
    userid: str,
    secret: str,
):
    if userid is None or secret is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

    print(f'userid: {userid},secret: {secret}')
    if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
        return 'pass'
    else:
        return 'fail'

@tool
def search(query:str):
    """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""
    serp = SerpAPIWrapper()
    result = serp.run(query)
    print("实时搜索结果:", result)
    return result


def get_prompt():
    prompt = hub.pull("hwchase17/structured-chat-agent")

    return prompt

async def chat(query):
    global llm,tools
    agent = create_structured_chat_agent(
        llm, tools, get_prompt()
    )

    agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)

    events = agent_executor.astream_events({"input": query}, version="v1")
    async for event in events:
        type = event['event']
        if 'on_chat_model_stream' == type:
            data = event['data']
            chunk = data['chunk']
            content = chunk.content
            if content and len(content) > 0:
                print(content)
                yield content


@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
    await manager.connect(websocket)
    try:
        while True:
            text = await websocket.receive_text()

            if 'fail' == permission:
                await manager.send_personal_message(
                    f"authentication failed", websocket
                )
            else:
                if text is not None and len(text) > 0:
                    async for msg in chat(text):
                        await manager.send_personal_message(msg, websocket)

    except WebSocketDisconnect:
        manager.disconnect(websocket)
        print(f"Client #{userid} left the chat")
        await manager.broadcast(f"Client #{userid} left the chat")

if __name__ == '__main__':
    tools = [search]
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)
    uvicorn.run(app, host='0.0.0.0',port=7777)

客户端:

<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
            <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
            <br/>
            <button onclick="connect(event)">Connect</button>
            <hr>
            <label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = null;
            function connect(event) {
                var userid = document.getElementById("userid")
                var secret = document.getElementById("secret")
                ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
                ws.onmessage = function(event) {
                    var messages = document.getElementById('messages')
                    var message = document.createElement('li')
                    var content = document.createTextNode(event.data)
                    message.appendChild(content)
                    messages.appendChild(message)
                };
                event.preventDefault()
            }
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>

调用结果:

用户输入:你好

不需要触发工具调用

模型输出:

用户输入:广州现在天气如何?

需要调用工具

模型输出:

```
Action:
```
{
  "action": "Final Answer",
  "action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
}
```

PS:

1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1887943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言】inline 关键字

在C语言中&#xff0c;inline关键字用于建议编译器对函数进行内联展开&#xff0c;而不是像普通函数一样调用。内联函数的目的是减少函数调用的开销&#xff0c;特别是对于简单的、频繁调用的函数。 内联函数的定义和使用 定义内联函数 要定义一个内联函数&#xff0c;需要在…

小红书运营教程02

小红书大致会分享10篇左右。微博、抖音、以及视频剪辑等自媒体运营相关技能以及运营教程相关会陆续的进行分享。 上次分享涉及到的对比,母婴系列,或者可以说是服装类型,不需要自己过多的投入,对比知识类博主来说,自己将知识讲述出来,然后要以此账号进行变现就比较麻烦,…

认识一下HttpMessageHandler处理管道

[S1208]HttpClient的默认管道结构 接下来我们通过如下的演示程序使用IHttpClientFactory工厂创建了 一个HttpClient对象&#xff0c;并查看其管道依次由哪些类型的HttpMessageHandler对象组成。如代码片段所示&#xff0c;我们定义了一个辅助方法PrintPipeline方法以递归的形式…

我等你,就在微软头马Open House开放日

当众讲话一直是我职业生涯中的重要部分&#xff0c;MSTMC 头马俱乐部更是我成长路上的重要伙伴。今天&#xff0c;我诚挚地邀请你参加即将在北京微软大厦举行的 微软头马Open House开放日活动&#xff01; 活动详情&#xff1a; &#x1f4c5; 日期&#xff1a;2024年7月3日&am…

Python 作业题1 (猜数字)

题目 你要根据线索猜出一个三位数。游戏会根据你的猜测给出以下提示之一&#xff1a;如果你猜对一位数字但数字位置不对&#xff0c;则会提示“Pico”&#xff1b;如果你同时猜对了一位数字及其位置&#xff0c;则会提示“Fermi”&#xff1b;如果你猜测的数字及其位置都不对&…

AI姓氏头像生成微信小程序系统源码

&#x1f525;【科技新潮流】AI姓氏头像生成系统&#xff0c;你的专属个性新名片&#xff01;&#x1f389; &#x1f31f; 开篇惊艳&#xff1a;一键解锁你的姓氏魅力 ✨ Hey小伙伴们&#xff0c;今天我要安利一个超酷炫的科技小玩意——AI姓氏头像生成系统&#xff01;是不…

API 授权最佳实践

API&#xff08;应用程序编程接口&#xff09;就像秘密之门&#xff0c;允许不同的软件程序进行通信。但并不是每个人都应该拥有每扇门的钥匙&#xff0c;就像不是每个软件都应该不受限制地访问每个 API 一样。 这些 API 将从银行的移动应用程序到您最喜欢的社交媒体平台的所有…

python机器人编程——用pytorch实现六轴机械臂的正向和逆向数值解算,及python算法解析

目录 一、前言二、实现原理2.1正向建模2.2张量化2.3绘制3D动画及操作UI 三、结论四、python源码PS.扩展阅读ps1.六自由度机器人相关文章资源ps2.四轴机器相关文章资源ps3.移动小车相关文章资源 一、前言 前面对六轴&#xff08;或多轴&#xff09;机械臂进行了一些研究&#x…

SCI一区级 | Matlab实现BO-Transformer-LSTM时间序列预测

SCI一区级 | Matlab实现BO-Transformer-LSTM时间序列预测 目录 SCI一区级 | Matlab实现BO-Transformer-LSTM时间序列预测效果一览基本介绍![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/caef5d232c4b436ebb01d717819d5ff1.png)程序设计参考资料 效果一览 基本介绍…

C语言的数据结构:图的基本概念

前言 之前学过了其它的数据结构&#xff0c;如&#xff1a; 集合 \color{#5ecffd}集合 集合 —— 数据元素属于一个集合。 线型结构 \color{#5ecffd}线型结构 线型结构 —— 一个对一个&#xff0c;如线性表、栈、队列&#xff0c;每一个节点和其它节点之间的关系 一个对一个…

【Linux】生物信息学常用命令

参考资料来自生信技能树 先输入echo export PS1"[\033]2;\h:\u \w\007\033[33;1m]\u \033[35;1m\t\033[0m [\033[36;1m]\w[\033[0m]\n[\e[32;1m]$ [\e[0m]" >> ~/.bashrc 再输入source ~/.bashrc就能够让命令字体带上颜色&#xff0c;同时命令将会在下一行开…

Spark on k8s 源码解析执行流程

Spark on k8s 源码解析执行流程 1.通过spark-submit脚本提交spark程序 在spark-submit脚本里面执行了SparkSubmit类的main方法 2.运行SparkSubmit类的main方法&#xff0c;解析spark参数&#xff0c;调用submit方法 3.在submit方法里调用doRunMain方法&#xff0c;最终调用r…

LiveNVR监控流媒体Onvif/RTSP用户手册-视频广场:状态记录、播放、回放入口、筛选在线离线、搜索

LiveNVR监控流媒体Onvif/RTSP用户手册-视频广场:状态记录、播放、回放入口、筛选在线离线、搜索 1、视频广场1.1、搜索筛选1.2、状态记录1.3、播放1.4、视频信息1.5、回放入口 2、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 1、视频广场 1.1、搜索筛选 可以下拉筛选 在线、离线 &a…

小红书 达芬奇:生活问答 AI 机器人

小红书去年 9 月开始内测的生活问答 AI 机器人&#xff1a;达芬奇&#xff0c;现在可以在小红书 APP 上用了 得益于小红书平台的特性&#xff0c;该助手擅长吃、住、宠、喝、学等等各类生活知识&#xff0c;目前还在搞活动&#xff0c;写评测笔记最高得 666 元

《后端程序猿 · 基于 Lettuce 实现缓存容错策略》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; 近期刚转战 CSDN&#xff0c;会严格把控文章质量&#xff0c;绝不滥竽充数&#xff0c;如需交流&#xff…

LinkedList底层原理

LinkedList特有方法 源码分析

Redis 缓存预热、缓存雪崩、缓存击穿、缓存穿透业务实践

0、前言 本文所有代码可见 > 【gitee code demo】 本文会涉及 缓存预热、缓存雪崩、缓存击穿、缓存穿透介绍和解决方案业务实践 1、缓存预热 1.1、描述 提前将热点数据加载到缓存&#xff0c;提前响应&#xff0c;降低后端数据源访问压力 1.2、实践 Autowiredprivate R…

Python基础002

Python数据类型 1、字符串&#xff08;str&#xff09; str3 """I miss you so much""" print("str3 ", str3,type(str3)) str3 I miss you so much <class str>2、整数&#xff08;int&#xff09; str1 55 print(&quo…

什么是定时器?

前言&#x1f440;~ 上一章我们介绍了阻塞队列以及生产者消息模式&#xff0c;今天我们来讲讲定时器 定时器 标准库中的定时器 schedule()方法 扫描线程 手动实现定时器 任务类 存储任务的数据结构 定时器类 如果各位对文章的内容感兴趣的话&#xff0c;请点点小赞&am…

【postgresql】数据库操作

创建数据库 使用 CREATE DATABASE SQL 语句来创建 语法&#xff1a; CREATE DATABASE dbname; 使用 createdb 命令来创建 语法&#xff1a; createdb [option...] [dbname [description]] 参数说明&#xff1a; dbname&#xff1a;要创建的数据库名。 description&…