仿照ChatGLM3部署,参考了Qwen模型的文档,模型地址https://modelscope.cn/models/qwen/Qwen1.5-1.8B-Chat/summary
http接口
- 服务端代码
api.py
from fastapi import FastAPI, Request
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
import uvicorn
import json
import datetime
import torch
# 设置设备参数
DEVICE = "cuda" # 使用CUDA
DEVICE_ID = "0" # CUDA设备ID,如果未设置则为空
CUDA_DEVICE = f"{DEVICE}:{DEVICE_ID}" if DEVICE_ID else DEVICE # 组合CUDA设备信息
# 加载预训练的分词器和模型
model_name_or_path = '/root/autodl-tmp/qwen/Qwen1.5-1.8B-Chat'
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(model_name_or_path, device_map="auto", torch_dtype=torch.bfloat16)
# 清理GPU内存函数
def torch_gc():
if torch.cuda.is_available(): # 检查是否可用CUDA
with torch.cuda.device(CUDA_DEVICE): # 指定CUDA设备
torch.cuda.empty_cache() # 清空CUDA缓存
torch.cuda.ipc_collect() # 收集CUDA内存碎片
# 创建FastAPI应用
app = FastAPI()
# 处理POST请求的端点
@app.post("/")
async def create_item(request: Request):
global model, tokenizer # 声明全局变量以便在函数内部使用模型和分词器
json_post_raw = await request.json() # 获取POST请求的JSON数据
json_post = json.dumps(json_post_raw) # 将JSON数据转换为字符串
json_post_list = json.loads(json_post) # 将字符串转换为Python对象
prompt = json_post_list.get('prompt') # 获取请求中的提示
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
# 调用模型进行对话生成
input_ids = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True)
model_inputs = tokenizer([input_ids], return_tensors="pt").to('cuda')
generated_ids = model.generate(model_inputs.input_ids,max_new_tokens=512)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
now = datetime.datetime.now() # 获取当前时间
time = now.strftime("%Y-%m-%d %H:%M:%S") # 格式化时间为字符串
# 构建响应JSON
answer = {
"response": response,
"status": 200,
"time": time
}
# 构建日志信息
log = "[" + time + "] " + '", prompt:"' + prompt + '", response:"' + repr(response) + '"'
print(log) # 打印日志
torch_gc() # 执行GPU内存清理
return answer # 返回响应
# 主函数入口
if __name__ == '__main__':
# 启动FastAPI应用
# 用6006端口可以将autodl的端口映射到本地,从而在本地使用api
uvicorn.run("api:app", host='127.0.0.1', port=6006, workers=2) # 在指定端口和主机上启动应用
# gunicorn api:app -w 3 -k uvicorn.workers.UvicornWorker -b 127.0.0.1:6006
- 客户端代码
clientapi.py
import requests
import json
def get_completion(prompt):
headers = {'Content-Type': 'application/json'}
data = {"prompt": prompt}
response = requests.post(url='http://127.0.0.1:6006', headers=headers, data=json.dumps(data))
return response.json()['response']
if __name__ == '__main__':
print(get_completion('你可以记录之前说过的内容吗'))
- 压测代码
locusthttp.py
,运行locust -f locusthttp.py
(有UI),使用vmstat
或top
查看进程、内存、CPU等情况。
import json
from locust import HttpUser, TaskSet, task
# 定义用户行为
class UserBehavior(TaskSet):
# 任一测试用例执行前均会执行一次
def on_start(self):
print('开始性能测试')
# 表示一个用户为行,访问百度首页。使用 @task装饰该方法为一个事务。client.get()用于指请求的路径“ / ”,因为是百度首页,所以指定为根路径。
@task(1)
def index(self):
self.client.get("/")
@task(2) # task()参数用于指定该行为的执行权重。参数越大每次被虚拟用户执行的概率越高。如果不设置默认为1。
def index2(self):
headers = {'Content-Type': 'application/json'}
data = {'prompt': '你知道珠穆朗玛峰吗'}
self.client.post(url='/index2', headers=headers, data=json.dumps(data))
@task(2) # task()参数用于指定该行为的执行权重。参数越大每次被虚拟用户执行的概率越高。如果不设置默认为1。
def index3(self):
headers = {'Content-Type': 'application/json'}
data = {'prompt': '你是谁'}
self.client.post(url='/index3', headers=headers, data=json.dumps(data))
@task(2) # task()参数用于指定该行为的执行权重。参数越大每次被虚拟用户执行的概率越高。如果不设置默认为1。
def index4(self):
headers = {'Content-Type': 'application/json'}
data = {'prompt': '西红柿炒番茄怎么做'}
self.client.post(url='/index4', headers=headers, data=json.dumps(data))
# 用于设置性能测试
class WebsiteUser(HttpUser):
# 指向一个定义的用户行为类。
tasks = [UserBehavior]
# 执行事务之间用户等待时间的下界(单位:毫秒)。如果TaskSet类中有覆盖,以TaskSet 中的定义为准。
min_wait = 3000
# 执行事务之间用户等待时间的上界(单位:毫秒)。如果TaskSet类中有覆盖,以TaskSet中的定义为准。
max_wait = 6000
# 设置 Locust 多少秒后超时,如果为 None ,则不会超时。
stop_timeout = 5
# 一个Locust实例被挑选执行的权重,数值越大,执行频率越高。在一个 locustfile.py 文件中可以同时定义多个 HttpUser 子类,然后分配他们的执行权重
weight = 3
# 脚本指定host执行测试时则不在需要指定
host = "http://127.0.0.1:6006"
WebSocket长连接
- 服务端代码
websocketapi.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
import uvicorn
import torch
pretrained = "/root/autodl-tmp/qwen/Qwen1.5-1.8B-Chat"
tokenizer = AutoTokenizer.from_pretrained(pretrained, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(pretrained, device_map="auto", torch_dtype=torch.bfloat16)
model = model.eval()
app = FastAPI()
app.add_middleware(
CORSMiddleware
)
with open('websocket_demo.html') as f:
html = f.read()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
input: JSON String of {"prompt": ""}
output: JSON String of {"response": "", "status": 200}
status 200 stand for response ended, else not
"""
await websocket.accept()
try:
while True:
json_request = await websocket.receive_json()
prompt = json_request['prompt']
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
input_ids = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True)
model_inputs = tokenizer([input_ids], return_tensors="pt").to('cuda')
generated_ids = model.generate(model_inputs.input_ids,max_new_tokens=512)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
await websocket.send_json({
"response": response,
"status": 202,
})
await websocket.send_json({"status": 200})
except WebSocketDisconnect:
pass
def main():
uvicorn.run(f"{__name__}:app", host='127.0.0.1', port=6006, workers=2)
if __name__ == '__main__':
main()
# gunicorn websocket_api_qwen:app -w 3 -k uvicorn.workers.UvicornWorker -b 127.0.0.1:6006
- 前端代码
websocket.html
<!DOCTYPE html>
<html lang="en">
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="return false;" id="form">
<label for="messageText"></label>
<input type="text" id="messageText" autocomplete="off"/>
<button type="submit">Send</button>
</form>
<ul id='messageBox'>
</ul>
<script>
let ws = new WebSocket("ws://" + location.host + "/ws");
let history = [];
let last_message_element = null;
function appendMessage(text, sender, dom = null) {
if (dom === null) {
let messageBox = document.getElementById('messageBox');
dom = document.createElement('li');
messageBox.appendChild(dom);
}
dom.innerText = sender + ':' + text;
return dom
}
function sendMessage(event) {
if (last_message_element !== null) { // 如果机器人还没回复完
return;
}
let input = document.getElementById("messageText");
if (input.value === "") {
return;
}
let body = {"prompt": input.value};
ws.send(JSON.stringify(body));
appendMessage(input.value, '用户')
input.value = '';
event.preventDefault();
}
document.getElementById("form").addEventListener('submit', sendMessage)
ws.onmessage = function (event) {
let body = JSON.parse(event.data);
let status = body['status']
if (status === 200) { // 如果回答结束了
last_message_element = null;
} else {
history = body['history']
last_message_element = appendMessage(body['response'], 'Qwen1.5-1.8B-chat', last_message_element)
}
};
</script>
</body>
</html>
- 运行结果
- 压测代码
locustwebsocket.py
import json
import time
import websocket
from locust import User, TaskSet, task, events
import random
class WebSocketClient(object):
def __init__(self, host):
self.host = host
self.ws = websocket.WebSocket()
def connect(self, burl):
start_time = time.time()
try:
self.conn = self.ws.connect(url=burl)
except websocket.WebSocketTimeoutException as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="websocket", name='urllib', response_time=total_time, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="websocket", name='urllib', response_time=total_time, response_length=0)
return self.conn
def recv(self):
return self.ws.recv()
def send(self, msg):
self.ws.send(msg)
class WebsocketLocust(User):
def __init__(self, *args, **kwargs):
super(WebsocketLocust, self).__init__(*args, **kwargs)
self.client = WebSocketClient(self.host)
class SupperDianCan(TaskSet):
@task
def test(self):
self.url = 'http://127.0.0.1:6006'
self.data = {}
self.client.connect(self.url)
while True:
recv = self.client.recv()
print(recv)
if eval(recv)['type'] == 'keepalive':
self.client.send(recv)
else:
self.client.send(self.data)
class WebsocketUser(TaskSet):
host = "http://127.0.0.1:6006"
client = None
def on_start(self):
self.client = WebSocketClient("ws://127.0.0.1:6006/ws")
self.client.connect()
@task
def send_message(self):
# 发送的订阅请求
num = random.randint(0, 10)
prompt = f"世界上第{num}高的山峰是什么"
self.client.send(json.dumps({'prompt': prompt}))
response = self.client.recv()
print(json.loads(response))
class WebsiteUser(User):
tasks = [WebsocketUser]
min_wait = 3000
max_wait = 6000
stop_timeout = 5