python 协程
python 生成器的作用
协程在多个模型流式输出中的使用实例
服务端
import json
from fastapi import FastAPI, Query, Path
import uvicorn
from starlette.requests import Request
from sse_starlette import EventSourceResponse
import asyncio
from asyncio import Queue
app = FastAPI()
@app.get("/v1/models")
async def get_models():
data = {
"data": [
{
"id": "Qwen1.5-7B", # openai 支持模型id
"object": "model", # openai 支持模型类别
"owned_by": "organization-owner", # openai 支持模型所有者
"permission": [] # openai 支持模型权限,暂时不支持
},
{
"id": "chatglm3-6b",
"object": "model",
"owned_by": "organization-owner",
"permission": []
}
],
"object": "list" # data 类型
}
return data
async def output_data(text: str, model: str):
output = ""
for idx, word in enumerate(text):
output += word
chunk = {
"id": None,
"choices": [
{
"delta": {
"content": f"{model} {idx} {output}",
"function_call": None, # OpenAI返回,未知
"role": "assistant", # OpenAI系统消息角色
"tool_calls": None # OpenAI返回,未知
},
"finish_reason": "length", # OpenAI停止码
"index": 0, # OpenAI返回,未知
"logprobs": None # OpenAI返回,未知
}
],
"created": 1715238637, # 时间戳
"model": model, # OpenAI模型id
"object": "chat.completion.chunk", # OpenAI消息类型
"system_fingerprint": None # OpenAI返回,未知
}
data = json.dumps(chunk, ensure_ascii=False)
yield data
await asyncio.sleep(1)
@app.post("/v1/chat/completions")
async def flush_stream(request: Request):
models = ["chatglm3", "qwen"]
async def async_generate(index:int, model: str, queue: Queue):
text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
if model == "chatglm3":
text = "我是chatglm3的流式输出嘿嘿!!!"
items_data = output_data(text=text, model=model)
async for item_data in items_data:
# print(f"generate### {model} {item_data}")
# yield item_data
queue.put_nowait((index, item_data))
queue.put_nowait((index, None))
async def async_consumer(queue: Queue, indices: list, timeout: float):
indices = set(indices)
finished = set()
while indices != finished:
try:
index, response = await asyncio.wait_for(queue.get(), timeout)
if response is None:
finished.add(index)
print("consumer queue indices finished", indices, finished)
yield (index, response)
except TimeoutError:
break
async def async_process(models: list):
### 1. 支持多个模型的流式输出
queue = Queue()
tasks = [
asyncio.create_task(async_generate(index=index, model=model, queue=queue))
for index, model in enumerate(models)
]
all_text = [dict() for _ in range(len(models))]
async for index, response in async_consumer(queue=queue, indices=list(range(len(tasks))), timeout=10):
if response is not None:
all_text[index] = response
print(f"process ####### {all_text}")
res_text = json.dumps(all_text)+"\n"
yield res_text
print(f"process END END")
# ### 2.支持单个模型的流式输出
# text = "这是一个流式输出他会将每个字挨个的输出哈哈!!!"
# items_data = output_data(text=text, model=models[0])
# async for item_data in items_data:
# yield item_data
return EventSourceResponse(async_process(models), media_type="text/event-stream")
# return EventSourceResponse(async_process(models), media_type="text/plain")
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8080)
运行:
> python.exe .\main.py
INFO: Started server process [12872]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
INFO: 127.0.0.1:64346 - "POST /v1/chat/completions HTTP/1.1" 200 OK
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', {}]
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
......
process ####### ['{"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}', '{"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant",
"tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}']
consumer queue indices finished {0, 1} {0, 1}
process END END
客户端
from openai import OpenAI
client = OpenAI(
api_key="EMPTY",
base_url="http://127.0.0.1:8080/v1/"
)
response = client.chat.completions.create(
model="EMPTY",
messages=[
# {"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "你好"}
# {"role": "user", "content": "谁是特朗普"},
# {"role": "assistant", "content": "特朗普是美国前总统"},
# {"role": "user", "content": "特朗普多大年纪了"},
],
functions=None,
temperature=1,
top_p=0,
max_tokens=20,
stream=True,
)
print("#####", response)
ret_text = ""
for part in response:
# print("\n33333", type(part), part)
print("\n33333", type(part), part[0])
print("33333", type(part), part[1])
运行:
> python.exe .\stream.py
##### <openai.Stream object at 0x0000024D1C322590>
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> ChatCompletionChunk(id=None, choices=None, created=None, model=None, object=None, service_tier=None, system_fingerprint=None, usage=None)
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 0 我", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 0 这", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}
......
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "chatglm3 19 我是chatglm3的流式输出嘿嘿!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "chatglm3", "object": "chat.completion.chunk", "system_fingerprint": null}
33333 <class 'list'> {"id": null, "choices": [{"delta": {"content": "qwen 23 这是一个流式输出他会将每个字挨个的输出哈哈!!!", "function_call": null, "role": "assistant", "tool_calls": null}, "finish_reason": "length", "index": 0, "logprobs": null}], "created": 1715238637, "model": "qwen", "object": "chat.completion.chunk", "system_fingerprint": null}