协程在多个模型流式输出中的使用实例

news2024/10/23 5:57:58

python 协程
python 生成器的作用
协程在多个模型流式输出中的使用实例

服务端

import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponse

import asyncio
from asyncio import Queue

app = FastAPI()
 
@app.get("/v1/models")
async def get_models():
    data = {
  "data": [
        {
        "id": "Qwen1.5-7B",						# openai 支持模型id
        "object": "model",					    # openai 支持模型类别
        "owned_by": "organization-owner",	    # openai 支持模型所有者
        "permission": []						# openai 支持模型权限,暂时不支持
        },
        {
        "id": "chatglm3-6b",
        "object": "model",
        "owned_by": "organization-owner",
        "permission": []
        }
    ],
    "object": "list"							# data 类型
    }
    return data


async def output_data(text: str, model: str):
    output = ""
    for idx, word in enumerate(text):
        output += word
        chunk = {
            "id": None,
            "choices": [
                {
                    "delta": {
                        "content": f"{model} {idx} {output}",
                        "function_call": None, 					# OpenAI返回,未知
                        "role": "assistant",					# OpenAI系统消息角色
                        "tool_calls": None						# OpenAI返回,未知
                    },
                    "finish_reason": "length",					# OpenAI停止码
                    "index": 0,								    # OpenAI返回,未知
                    "logprobs": None							# OpenAI返回,未知
                }
            ],
            "created": 1715238637, 						    # 时间戳
            "model": model,							        # OpenAI模型id
            "object": "chat.completion.chunk",				# OpenAI消息类型
            "system_fingerprint": None						# OpenAI返回,未知
        }

        data = json.dumps(chunk, ensure_ascii=False)
        yield data
        await asyncio.sleep(1)


@app.post("/v1/chat/completions")
async def flush_stream(request: Request):
    models = ["chatglm3", "qwen"]

    async def async_generate(index:int, model: str, queue: Queue):
        text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
        if model == "chatglm3":
            text = "我是chatglm3的流式输出嘿嘿!!!"
        items_data = output_data(text=text, model=model)
        
        async for item_data in items_data:
            # print(f"generate### {model} {item_data}")
            # yield item_data
            queue.put_nowait((index, item_data))
        queue.put_nowait((index, None))

    async def async_consumer(queue: Queue, indices: list, timeout: float):
        indices = set(indices)
        finished = set()

        while indices != finished:
            try:
                index, response = await asyncio.wait_for(queue.get(), timeout)
                if response is None:
                    finished.add(index)
                    print("consumer queue indices finished", indices, finished)
                yield (index, response)
            except TimeoutError:
                break

    async def async_process(models: list):
        ### 1. 支持多个模型的流式输出
        queue = Queue()
        tasks = [
            asyncio.create_task(async_generate(index=index, model=model, queue=queue))
            for index, model in enumerate(models)
        ]
        
        all_text = [dict() for _ in range(len(models))]
        async for index, response in async_consumer(queue=queue, indices=list(range(len(tasks))), timeout=10):
            if response is not None:
                all_text[index] = response

                print(f"process ####### {all_text}")
                res_text = json.dumps(all_text)+"\n"
                yield res_text
        print(f"process END END")


        # ### 2.支持单个模型的流式输出
        # text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
        # items_data = output_data(text=text, model=models[0])
        # async for item_data in items_data:
        #     yield item_data

    return EventSourceResponse(async_process(models), media_type="text/event-stream")
    # return EventSourceResponse(async_process(models), media_type="text/plain")
 
if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8080)

运行:

> python.exe .\main.py
INFO:     Started server process [12872]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
INFO:     127.0.0.1:64346 - "POST /v1/chat/completions HTTP/1.1" 200 OK
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', {}]
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
......
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", 
"tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
consumer queue indices finished {0, 1} {0, 1}
process END END

客户端

from openai import OpenAI

client = OpenAI(
        api_key="EMPTY",
        base_url="http://127.0.0.1:8080/v1/"
    )
response = client.chat.completions.create(
        model="EMPTY",
        messages=[
            # {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "你好"}
            # {"role": "user", "content": "谁是特朗普"},
            # {"role": "assistant", "content": "特朗普是美国前总统"},
            # {"role": "user", "content": "特朗普多大年纪了"},
        ],
        functions=None,
        temperature=1,
        top_p=0,
        max_tokens=20,
        stream=True,
    )

print("#####", response)
ret_text = ""
for part in response:
    # print("\n33333", type(part), part)
    print("\n33333", type(part), part[0])
    print("33333", type(part), part[1])

运行:

> python.exe .\stream.py
##### <openai.Stream object at 0x0000024D1C322590>

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> ChatCompletionChunk(id=None, choices=None, created=None, model=None, object=None, service_tier=None, system_fingerprint=None, usage=None)        

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}
......

33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2221365.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ROS2】构建导航工程

1、ROS小车组成 ROS小车由三大件组成:运动底盘、ROS主控、导航传感器。 1.1 运动底盘 运动底盘的硬件由车轮、电机(带编码器)、电机驱动器、STM32控制器、电池等组成。 涉及的知识点主要为:STM32单片机程序、机器人运动学分析 1)STM32单片机程序 单片机程序框架如下:…

在Linux命令行下载Google Drive大文件(解决Google Drive下载慢的问题)

文章目录 1、使用gdown命令2、复制链接3、替换为Linux下载链接 注意&#xff1a;在Linux命令行进行 1、使用gdown命令 wget只能下载小文件&#xff0c;大文件需要用到gdown pip install gdown# 如果不能够直接安装&#xff0c;使用以下命令 git clone https://github.com/wk…

基于Spring Boot + Vue程序员云书店系统设计与实现

&#x1f497;博主介绍&#x1f497;&#xff1a;✌在职Java研发工程师、专注于程序设计、源码分享、技术交流、专注于Java技术领域和毕业设计✌ 温馨提示&#xff1a;文末有 CSDN 平台官方提供的老师 Wechat / QQ 名片 :) Java精品实战案例《700套》 2025最新毕业设计选题推荐…

MySQL9.0安装教程zip手动安装(Windows)

本章教程&#xff0c;主要介绍如何在Windows上安装MySQL9.0&#xff0c;通过zip方式进行手动安装。 一、下载MySQL压缩包 下载地址&#xff1a;https://downloads.mysql.com/archives/community/ 二、解压MySQL压缩包 将下载好的压缩包&#xff0c;进行解压缩&#xff0c;并且将…

要让AI(任何一款绘图AI)把一个己有风格的图片画到一个实物商品上的窍门

本教程适合midjourney, comfyui, stable diffusion 己有图片 希望生成效果 我们希望&#xff0c;在一个现实世界真实IPhone手机上可以有一个这样的小魔女作为一个手机的展示&#xff0c;同时手机处于开机状态&#xff0c;在手机的屏幕上有一个这样的戴帽子的穿蓝色小披风的小…

阿里Dataworks使用循环节点和赋值节点完成对mongodb分表数据同步

背景 需求将MongoDB数据入仓MaxCompute 环境说明 MongoDB 100个Collections&#xff1a;orders_1、orders_2、…、orders_100 前期准备 1、MongoDB数据源配置 需要先保证DW和MongoDB网络是能够联通的&#xff0c;需要现在集成任务中配置MongoDB的数据源信息。 具体可以查…

Java项目-基于springboot框架的学习选课系统项目实战(附源码+文档)

作者&#xff1a;计算机学长阿伟 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、ElementUI等&#xff0c;“文末源码”。 开发运行环境 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBoot、Vue、Mybaits Plus、ELementUI工具&#xff1a;IDEA/…

Linux基础命令(入门)

linux 用户 root 用户 一个特殊的管理帐户 也被称为超级用户 root已接近完整的系统控制 对系统损害几乎有无限的能力 除非必要,不要登录为 root 普通&#xff08; 非特权 &#xff09; 用户权限有限 造成损害的能力比较有限 linux的哲学思想&#xff08;优点&#xf…

vue3学习记录-组件通信

vue3学习记录-组件通信 1.父子组件通信2.兄弟组件传值2.1 以父组件为媒介2.2 发布订阅模式2.3 使用mitt2.3.1 全局使用2.3.2 局部使用 1.父子组件通信 父组件&#xff1a; <template>父组件原有的title:{{ title }}<p>---</p><com :title"title&qu…

jmeter使用文档

文章目录 一、安装使用1、下载2、bin/jmeter.properties介绍 二、windows使用1、微调&#xff08;1&#xff09;界面样式&#xff08;2&#xff09;修改语言 2、简单使用3、各组件详解&#xff08;1&#xff09;CSV 数据文件配置&#xff08;2&#xff09;BeanShell取样器 三、…

【Linux线程】Linux多线程实践:深入生产者消费者模型

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;Linux “ 登神长阶 ” &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀Linux多线程 &#x1f4d2;1. 生产者消费者模型&#x1f4dc;2. 基于BlockingQueue的生产者消…

BurpSuite渗透工具的简单使用

BurpSuite渗透工具 用Burp Suite修改请求 step1&#xff1a; 安装Burp Suite。官网链接&#xff1a;Burp Suite官网 step2&#xff1a; 设置代理 step3&#xff1a; 如果要拦截https请求&#xff0c;还需要在客户端安装证书 step4&#xff1a; 拦截到请求可以在Proxy ->…

进程地址空间与页表

目录 引言 问题导入 进程地址空间 宏观的过程去分析 谈细节 1.进程地址空间究竟是什么 2.页表 3.进程被挂起 4.页表的存在&#xff0c;让进程管理和内存管理相连动 本文核心逻辑 引言 在当今的计算世界中&#xff0c;操作系统是管理计算机硬件和软件资源的关键组件。而…

联想与Meta合作基于Llama大模型推出面向PC的个人AI智能体——AI Now | LeetTalk Daily...

“LeetTalk Daily”&#xff0c;每日科技前沿&#xff0c;由LeetTools AI精心筛选&#xff0c;为您带来最新鲜、最具洞察力的科技新闻。 联想集团昨日在美国西雅图召开年度Tech World大会。联想CEO杨元庆在主题演讲中&#xff0c;与Meta创始人兼CEO马克扎克伯格一道宣布&#x…

MYSQL-建库、建表,并创建表的详细信息

首先&#xff0c;创建一个&#xff1a;图书管理系统数据库。 1、创建用户表&#xff1a; 2、创建图书表&#xff1a; 3、创建借阅登记表&#xff1a;

Android使用协程实现自定义Toast

Android使用协程实现自定义Toast ​ 最近有个消息提示需要显示10s,刚开始使用协程写了一个shoowToast方法&#xff0c;传入消息内容、显示时间和toast显示类型即可&#xff0c;以为能满足需求&#xff0c;结果测试说只有5s&#xff0c;查看日志和源码发现Android系统中Toast显…

尚硅谷spark学习

p4 快速上手 -开发环境准备

基于工业互联网平台的智能工厂辅助制造企业数字化转型

制造业数字化转型已是大势所趋&#xff0c;工业互联网平台对于制造业数字化转型的支撑作用将会越来越强&#xff0c;其应用为制造企业生产和运营优化的能力提升提供了探索应用模式和路径。平台的不断创新和应用突破&#xff0c;将不断为制造业的升级转型赋能。实施制造业数字化…

C#线程详解及应用示例

简介 在编写应用程序实现业务功能过程中&#xff0c;为解决吞吐量和响应效率的问题&#xff0c;我们会用到多线程、异步编程两项重要的技术。通过它们来提高应用程序响应和高效。应用程序每次运行都会启动一个进程&#xff08;进程是一种正在执行的程序&#xff09;&#xff0…

基于node.js宜家宜业物业管理系统【附源码】

基于node.js宜家宜业物业管理系统 效果如下&#xff1a; 系统首页界面 业主登录界面 停车位页面 小区公告页面 管理员登录界面 管理员功能界面 物业管理员管理界面 缴费信息管理界面 物业管理员功能界面 研究背景 近年来互联网技术飞速发展&#xff0c;给人们的生活带来了极…