1. 流式输出和非流失输出:
大模型的流式输出(Streaming Output)和非流式输出(Non-streaming Output)是指在生成文本或其他输出时,如何将结果返回给用户或下游系统。
流式输出 (Streaming Output):流式输出意味着模型生成的内容是逐步、逐块返回的,而不是等到整个生成过程完成后再一次性返回所有内容。
优点:
用户可以更早地开始看到部分结果,提高用户体验。
对于长时间生成任务,可以减少内存占用,因为不需要一次性存储完整的输出。
更适合实时应用,如在线聊天机器人,其中即时反馈是非常重要的。
应用场景:适用于需要即时响应的场景,比如对话系统、直播翻译等。
非流式输出 (Non-streaming Output):非流式输出是指模型在完成整个生成过程后,一次性返回全部生成的结果。
**优点:
实现简单,易于处理和调试。
在某些情况下,可能更适合那些需要对完整输出进行后续处理的应用。
应用场景:适用于那些不依赖即时反馈、或者需要对整个输出进行整体处理的场景,例如批量文本生成、文档摘要等。
2. 流式输出、非流失输出和vllm的同步、异步关系
对于vllm同步:无论是流式还是非流式输出,vllm的LLM函数创建的模型对象通常以同步的方式工作,处理多并发情况时只能以队列形式一个个输出。对于非流式输出,它会阻塞直到生成完成并返回结果;对于流式输出,它也可以逐步返回数据给前端,但这是假流式,因为后端以及把所有的文本都输出了,然后我们又把文本一个个传给前端。
对vllm异步:异步引擎同样可以支持流式和非流式输出,但它允许你以非阻塞的方式处理这些输出。你可以启动一个生成任务而不等待它完成,然后根据需要逐步获取流式输出,或者在任务完成后一次性获取非流式输出(也是并发状态)。这为高并发环境下的应用提供了更好的性能和灵活性。
总结:流式输出和非流式输出关注的是输出的传输方式,而AsyncLLMEngine和LLM则更多地涉及到执行模式(同步 vs 异步)。两者可以组合使用,例如,你可以使用AsyncLLMEngine来异步地处理流式输出,从而在高并发环境中获得最佳性能和用户体验。
3. fastapi流式响应代码
from fastapi import FastAPI
import uvicorn
import os
import json
import time
from starlette.responses import StreamingResponse
# from fastapi.responses import StreamingResponse
# 创建一个FastAPI应用程序实例
app = FastAPI()
@app.post("/api")
def aaa():
# StreamingResponse是一个提供的用于包装流式响应的类,必须以json字符串进行数据传递
# starlette.responses和fastapi.responses中的StreamingResponse对象实现方式基本类似。
return StreamingResponse(handle_post_request())
def handle_post_request():
for i in range(5):
print(i)
time.sleep(1)
yield json.dumps({'type': i}) + '\n'
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("API_PORT", 7777)), workers=1)
开启上述api后,使用以下代码发送请求:
import json
import requests
# url_api = "http://10.4.0.141:800h1/cat/stream"
url_api = "http://localhost:7777/api"
# 此调用方法为推荐方法。也可直接request.post发送请求,见https://editor.csdn.net/md/?articleId=144560081
with requests.post(url_api, stream=True) as r:
r.raise_for_status() # 检查请求是否成功
print(r.iter_lines())
for line in r.iter_lines():
if line: # 过滤掉保持连接的空行
print(json.loads(line.decode('utf-8')))
4. LLM流式输出代码(非接口型,仅后端)
普通后端的流式输出使用TextStreamer(是同步的,不适合做api的流式响应)
from modelscope import AutoModelForCausalLM, AutoTokenizer
from transformers import TextStreamer
model_name = "地址"
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto",
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
prompt = "你是谁"
messages = [
{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant."},
{"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
# 非流输出
# generated_ids = model.generate(
# **model_inputs,
# max_new_tokens=1024
# )
# generated_ids = [
# output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
# ]
# response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
# print(response)
# 流输出,普通后端的流式输出使用TextStreamer,这是同步的,不适合于api交互做流式响应,必须使用TextIteratorStreamer,它是异步的
streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generated_ids = model.generate(
**model_inputs,
max_new_tokens=512,
streamer=streamer,
)
5. LLM流式响应代码(接口型FastAPI)
API的流式输出使用TextIteratorStreamer(是异步的,适合做api的流式响应)
from fastapi import FastAPI
import uvicorn
import os
import json
from starlette.responses import StreamingResponse
# from fastapi.responses import StreamingResponse
from transformers import TextIteratorStreamer
from threading import Thread
from modelscope import AutoModelForCausalLM, AutoTokenizer
# 创建一个FastAPI应用程序实例
app = FastAPI()
model_name = "地址"
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto",
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
prompt = "写一篇800字的作文"
messages = [
{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant."},
{"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
@app.post("/api")
def aaa():
# StreamingResponse包装的是可迭代对象
return StreamingResponse(handle_post_request())
def handle_post_request():
# TextIteratorStreamer为异步。skip_prompt=True, skip_special_tokens=True可以去除输出中的|im_start|等标记
streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = {
"max_new_tokens": 1024, # 或者任何其他生成参数
"streamer": streamer,
}
thread = Thread(target=model.generate, kwargs={**model_inputs, **generation_kwargs})
thread.start()
answer = ''
for new_text in streamer:
answer += new_text
print(answer)
yield json.dumps({'content': answer}) + '\n'
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("API_PORT", 7777)), workers=1)
前端的调用为
import json
import requests
# url_api = "http://10.4.0.141:800h1/cat/stream"
url_api = "http://localhost:7777/api"
with requests.post(url_api, stream=True) as r:
r.raise_for_status() # 检查请求是否成功
print(r.iter_lines())
for line in r.iter_lines():
if line: # 过滤掉保持连接的空行
print(json.loads(line.decode('utf-8')))
6. 注意
非流式响应:fastapi之间的通信一般都是json对象的形式,发送请求时请求体为{},后端返回请求时的响应体也为{}。
流式响应:fastapi之间的通信一般都是json对象字符串格式的形式。使用json.dumps()将 str或者python对象(basemodel或者dict)转为json字符串格式。且流式响应一定要使用StreamingResponse对象包装