记录一下工作中进行多机多卡部署qwen2.5-vl多模态大模型踩过的坑
第一个天坑就是官方提供的镜像qwenllm/qwenvl:2.5-cu121有问题,在titan显卡会抛出cuda error:no kernel image is availabe for execution on the device. 这是cuda内核与GPU不兼容的问题,可是手动制作的其他cuda12镜像就能跑。在官网镜像基础上重装cuda11.8,倒是可以用了,但在gradio页面调用时会出现卡死情况,最终还是自己从头配置环境。关于如何配置docker镜像,没啥技术点,这里不说了。
一 容器配置
使用ray+vllm方式进行分布式部署,采用host模式。当一台机器不是所有机器都可用时,需要通过CUDA_VISIBLE_DEVICES变量指定ray分布式使用的显卡编号,通常在起容器时指定,当使用显卡编号有变化时,停掉分布式,在容器内手动指定显卡后再重新搭建分布式集群。
另外,机器之间的通信要指定网卡,host模式容器直接使用宿主机网卡,通过ifconfig查看当前机器可用网卡,每台机器网卡可能不同,所以在启动容器时将当前机器的网卡以环境变量的形式传入。下面就是集群中两台机器使用不同网卡的案例。
下面是一个启动容器的docker-compose命令,容器不分head与work节点,在容器内搭建集群时指定。
version: "3"
services:
# 服务名,可以不与container_name同名, 在Nginx中使用服务名做调度,
yblir_qwen2.5-vl:
image: qwen2.5-vl:cu12-torch25-vllm073-dist
container_name: dist_head_qwen2.5-vl
# 获取宿主机root权限,要映射端口也需要true
privileged: true
# 设置共享内存大小为16g,防止分布式训练dataloader时出错
shm_size: "32gb"
# 路径映射, 主机目录:容器内目录
volumes:
- /home/data/liyabin_project:/home
#ports:
# - "10086:22" # 容器内22端口映射到外部10086,22端口通常用于在外部调用容器内Python环境
# - "10087:10087" # 用途待定,比如可以tensorboard --logdir=路径 --port 10087
# 从外部传入变量
environment:
- NVIDIA_VISIBLE_DEVICES=0,3,4
- GLOO_SOCKET_IFNAME=enp134s0f0
- NCCL_SOCKET_IFNAME=enp134s0f0
# 容器重新启动,比如当容器被其他人kill了,会自动重启
restart: always
entrypoint: /bin/bash
# command: ["-c", "ray start --head --dashboard-host=0.0.0 --port 6379"]
tty: true
network_mode:
"bridge" # 默认桥接模式,容器之间不须要互相通讯.
"host" # 当前模式下ports端口映射失效, 容器环境不隔离,将使用主机的端口和ip.
注意,所有容器的环境,容器内模型文件路径必须一致。
vllm官方给了一个部署脚本,不好用,这里使用更灵活的手动搭建集群的方法。前面说在搭建集群时确定head与work节点,
在想做head节点的容器内执行如下命令:
ray start --head --dashboard-host=0.0.0.0 --port=6379
在work容器节点执行如下命令:
ray start --address='xx.xx.xx.xx:6379'
只能有一个head节点,work可以有多个。
在任意容器节点执行ray status,可以看到集群上所有节点设备情况
ray另一个常用命令是ray stop, 停止集群。在head节点上执行会杀掉整个集群,在work节点上执行则仅会卸载当前节点。
关于ray, 会使用这两个命令就能应对绝大部分集群问题。
在head节点执行启动集群时,会暴露一个URL,在浏览器页面监控设备使用情况:
如果一切顺利,到此完成集群搭建工作了,不过过程中难免会出现各种各样的坑,比如我曾在2x8 2080ti设备上搭建集群,work节点介入集群后很快就掉线,推测大概率时机器间通信不稳定造成的,所以,搭建集群的机器尽量处在同一网段。
二 环境测试
搭建好集群后,还需要对环境做测试,保证代码能跑通。
部署qwen2.5-vl最低环境要求为vllm>=0.7.2, transformers>=4.49
使用vllm命令行启动3b量化版来做测试:
vllm serve /home/Qwen2.5-VL-3B-Instruct-AWQ --tensor-parallel-size 2 --pipeline-parallel-size 2 --dtype float16 \
--port 8811 --gpu-memory-untilization 0.5 --max-num-seqs 2 --max-model-len 4096 --enforce-eager
- 问题1 cuda error: cuda error:no kernel image is availabe for execution on the device
问题看起来是cuda内核与GPU不兼容,开始以为是各种依赖包和cuda装的版本太高,不兼容titan显卡了,多次排查无果,只得重装cuda11.8,问题解决。之后又从零构建了cuda12.1的镜像,可以正常使用,只能把锅甩给qwen官方提供的镜像有问题。
- 问题2 attendtion计算后端不兼容问题
xformers wasn’t build with CUDA suport
requires device with capability > (8, 0) but your GPU has capability (7, 5)
原因:计算后端要在当前设备上现场编译,比如下面这个,xformers是在其他显卡预编译好的,使用时就会报错。
下载xformers源码编译安装,之后查看xformers详细信息。
python -m xformers.info
- 问题3 量化问题
The input size is not aligned with the quantized weight shape. This can be casused by too large tensor parallel size
官方早起Qwen2.5-VL-72B-Instruct-AWQ没做好,记得是FFN层参数不能整除的原因,不支持–tensor-parallel-size 2,4,8等设置,不过现在新版模型官方已经做好,不纠结这个问题了。
- 问题4 max_model_len设置问题
valueerror: The model’s max seq len (10240) is larger than the maximum number of tokens that can be stored in KV cache (3408). Try increasing ‘gpu_memory_utilization’ or decreasing max_model_len when initializing the engine.
原因: 这是设定最大上下文长度超过可存储的最大的kv-cache数量, 是显存不足的表现之一,增加gpu_memory_utilization或减少max_model_len可解决
max_model_len由入参指定, kv-cache在初始化时配置, 二者是一对相互矛盾的值, 当把max_model_len从10240减小到5120时, cuda blocks可用之从213增加到2090.
- 问题5 显存不足的另一中表现
torch.outofmemoryerror:cuda out of memory. tried to allocate 640.00MiB GPU 0 has a total capacity …
解决方案同问题4, 当修改gpu_memory_utilization后,报错信息无变化(连报错数值都没变化), 基本可确定是max_model_len引起的. 这是因为vllm初始化时会提前开辟一块显存空间存储max_model_len个全0 kv-cache备用, 开启这块空间时显存不够引起了报错. 关于max_model_len与kv-cache的关系在问题4中有说明.
- 问题6 head整除问题
total number of attention heads(16) must be divisible by tensor parallel size(6)
这是–tensor-parallel-size值没设定好,这个值必须被attention heads 整数. 模型head数量在模型文件config.json中可查看.
vllm 中tensor-parallel-size=N是指张量并行,标准说法是将 QKV 投影的参数矩阵会被切分成 N 份,每个 GPU 只存储并计算其中的一部分. 说人话就是一个attention由多个head attention组成, 将所有heads均分到N个gpu上做计算(有行并行和列并行的两种方式), 然后合并结果.
- 问题7 分布式启动问题
runtimeError: cannot re-initialize cuda in forked subprocess. To use CUDA with multiprocessing. you must use the ‘spawn’ start method
原因: vllm与torch两个组件分布式模块冲突, 在环境变量或启动脚本中指定分布式方式就好.
import os
os.environ['VLLM_WORKER_MULTIPROC_METHOD']='spawn'
三 服务代码改写及部署
使用vllm serve启动服务,然后通过request请求调用, 这是常用的方式,在上一份工作qwen2.5-vl使用vllm部署gradio页面调用已经详细描述过
工作中本地部署还尝试了另一方案,即改造qwen2.5-vl官方给的本地gradio页面调用的方式. vllm+gradio属于异步调用, 要使用vllm的AsyncLLMEngine来构建推理模型.
# -*- coding: utf-8 -*-
# @Time : 2025/3/10 下午19:31
# @Author : yblir
# @File : dist_demo_mm.py
# explain :
# =======================================================
import copy
import re
from argparse import ArgumentParser
# from threading import Thread
import threading
from vllm import SamplingParams, AsyncLLMEngine
from vllm.engine.arg_utils import AsyncEngineArgs
import gradio as gr
import torch
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration, TextIteratorStreamer
# DEFAULT_CKPT_PATH = '/home/Qwen2.5-VL-7B-Instruct'
# DEFAULT_CKPT_PATH = r'E:\PyCharm\PreTrainModel\Qwen2.5-VL-7B-Instruct'
DEFAULT_CKPT_PATH = '/home/Qwen25-VL-3B-Instruct-AWQ'
def _get_args():
parser = ArgumentParser()
parser.add_argument('-c',
'--checkpoint-path',
type=str,
default=DEFAULT_CKPT_PATH,
help='Checkpoint name or path, default to %(default)r')
parser.add_argument('--cpu-only', action='store_true', help='Run demo with CPU only')
parser.add_argument('--flash-attn2',
action='store_true',
default=False,
help='Enable flash_attention_2 when loading the model.')
parser.add_argument('--share',
action='store_true',
default=False,
help='Create a publicly shareable link for the interface.')
parser.add_argument('--inbrowser',
action='store_true',
default=False,
help='Automatically launch the interface in a new tab on the default browser.')
# ===================================================================================================================
parser.add_argument('--server-port', type=int, default=7860, help='Demo server port.')
parser.add_argument('--server-name', type=str, default='0.0.0.0', help='Demo server name.')
parser.add_argument('--max-image-nums',
type=int,
default=10,
help='多轮对话时可接受的最大图片数量')
parser.add_argument('--dtype',
type=str,
default='float16',
help='模型处理的数据类型,这里默认使用flaot16,减少显存')
parser.add_argument('--gpu-memory-utilization',
type=float,
default=0.15,
help='vllm要占用的gpu显存比例,若有其他程序也在使用显卡, 需要把该值设小')
parser.add_argument('--enforce-eager',
action='store_true',
default=True,
help='false时使用cuda graph, 会增加显存占用')
parser.add_argument('--max-model-len',
type=int,
default=4096,
help='可处理的最大输入token数量, 如果进行多轮对话,需要设置大值,不然token数量会超标,'
'也可以缺省,不设上限, 不过这样有报显存风险')
parser.add_argument('--tensor-parallel-size',
type=int,
default=2,
help='一台机器上使用的gpu数量, 该值的设定需要被attention的head数量整除')
parser.add_argument('--pipeline-parallel-size',
type=int,
default=2,
help='分布式系统使用的机器数量')
args = parser.parse_args()
return args
def _parse_text(text):
lines = text.split('\n')
lines = [line for line in lines if line != '']
count = 0
for i, line in enumerate(lines):
if '```' in line:
count += 1
items = line.split('`')
if count % 2 == 1:
lines[i] = f'<pre><code class="language-{items[-1]}">'
else:
lines[i] = '<br></code></pre>'
else:
if i > 0:
if count % 2 == 1:
line = line.replace('`', r'\`')
line = line.replace('<', '<')
line = line.replace('>', '>')
line = line.replace(' ', ' ')
line = line.replace('*', '*')
line = line.replace('_', '_')
line = line.replace('-', '-')
line = line.replace('.', '.')
line = line.replace('!', '!')
line = line.replace('(', '(')
line = line.replace(')', ')')
line = line.replace('$', '$')
lines[i] = '<br>' + line
text = ''.join(lines)
return text
def _remove_image_special(text):
text = text.replace('<ref>', '').replace('</ref>', '')
return re.sub(r'<box>.*?(</box>|$)', '', text)
def _is_video_file(filename):
video_extensions = ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.mpeg']
return any(filename.lower().endswith(ext) for ext in video_extensions)
def _gc():
import gc
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def _transform_messages(original_messages):
transformed_messages = []
for message in original_messages:
new_content = []
for item in message['content']:
if 'image' in item:
new_item = {'type': 'image', 'image': item['image']}
elif 'text' in item:
new_item = {'type': 'text', 'text': item['text']}
elif 'video' in item:
new_item = {'type': 'video', 'video': item['video']}
else:
continue
new_content.append(new_item)
new_message = {'role': message['role'], 'content': new_content}
transformed_messages.append(new_message)
return transformed_messages
async def async_generate(request_id, engine, inputs):
sampling_params = SamplingParams(
temperature=0.1,
top_p=0.001,
repetition_penalty=1.05,
max_tokens=512,
stop_token_ids=[]
)
response_gen = engine.generate(
request_id=request_id,
prompt=inputs,
sampling_params=sampling_params
)
async for result in response_gen:
for output in result.outputs:
yield output.text
def _launch_demo(args, model, processor):
async def call_local_model(model, processor, messages):
request_id = f"req-{threading.get_ident()}"
messages = _transform_messages(messages)
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
# image_inputs, video_inputs = process_vision_info(messages)
image_inputs, video_inputs, video_kwargs = process_vision_info(messages, return_video_kwargs=True)
mm_data = {}
if image_inputs is not None:
mm_data['image'] = image_inputs
if video_inputs is not None:
mm_data["video"] = video_inputs
ll_inputs = {
'prompt' : text,
'multi_modal_data' : mm_data,
# FPS will be returned in video_kwargs
"mm_processor_kwargs": video_kwargs
}
async for new_next in async_generate(request_id, model, ll_inputs):
yield new_next
def create_predict_fn():
async def predict(_chatbot, task_history):
nonlocal model, processor
chat_query = _chatbot[-1][0]
query = task_history[-1][0]
if len(chat_query) == 0:
_chatbot.pop()
task_history.pop()
yield _chatbot
print('User: ' + _parse_text(query))
history_cp = copy.deepcopy(task_history)
full_response = ''
messages = []
content = []
for q, a in history_cp:
if isinstance(q, (tuple, list)):
if _is_video_file(q[0]):
content.append({'video': f'file://{q[0]}'})
else:
content.append({'image': f'file://{q[0]}'})
else:
content.append({'text': q})
messages.append({'role': 'user', 'content': content})
messages.append({'role': 'assistant', 'content': [{'text': a}]})
content = []
messages.pop()
async for response in call_local_model(model, processor, messages):
# print("response=",response)
_chatbot[-1] = (_parse_text(chat_query), _remove_image_special(_parse_text(response)))
yield _chatbot
full_response = _parse_text(response)
task_history[-1] = (query, full_response)
print('Qwen-VL-Chat: ' + _parse_text(full_response))
yield _chatbot
return predict
def create_regenerate_fn():
async def regenerate(_chatbot, task_history):
nonlocal model, processor
if not task_history:
yield _chatbot
item = task_history[-1]
if item[1] is None:
yield _chatbot
task_history[-1] = (item[0], None)
chatbot_item = _chatbot.pop(-1)
if chatbot_item[0] is None:
_chatbot[-1] = (_chatbot[-1][0], None)
else:
_chatbot.append((chatbot_item[0], None))
_chatbot_gen = predict(_chatbot, task_history)
async for _chatbot in _chatbot_gen:
yield _chatbot
return regenerate
predict = create_predict_fn()
regenerate = create_regenerate_fn()
def add_text(history, task_history, text):
task_text = text
history = history if history is not None else []
task_history = task_history if task_history is not None else []
history = history + [(_parse_text(text), None)]
task_history = task_history + [(task_text, None)]
return history, task_history, ''
def add_file(history, task_history, file):
history = history if history is not None else []
task_history = task_history if task_history is not None else []
history = history + [((file.name,), None)]
task_history = task_history + [((file.name,), None)]
return history, task_history
def reset_user_input():
return gr.update(value='')
def reset_state(_chatbot, task_history):
task_history.clear()
_chatbot.clear()
_gc()
return []
with gr.Blocks() as demo:
gr.Markdown("""\
<p align="center"><img src="https://modelscope.oss-cn-beijing.aliyuncs.com/resource/qwen.png" style="height: 80px"/><p>"""
)
gr.Markdown("""<center><font size=8>Qwen2.5-VL</center>""")
gr.Markdown("""\
<center><font size=3>This WebUI is based on Qwen2.5-VL, developed by Alibaba Cloud.</center>""")
gr.Markdown("""<center><font size=3>本WebUI基于Qwen2.5-VL�?/center>""")
chatbot = gr.Chatbot(label='Qwen2.5-VL', elem_classes='control-height', height=500)
query = gr.Textbox(lines=2, label='Input')
task_history = gr.State([])
with gr.Row():
addfile_btn = gr.UploadButton('📁 Upload (上传文件)', file_types=['image', 'video'])
submit_btn = gr.Button('🚀 Submit (发送)')
regen_btn = gr.Button('🤔️ Regenerate (重试)')
empty_bin = gr.Button('🧹 Clear History (清除历史)')
submit_btn.click(add_text, [chatbot, task_history, query],
[chatbot, task_history]).then(predict, [chatbot, task_history], [chatbot], show_progress=True)
submit_btn.click(reset_user_input, [], [query])
empty_bin.click(reset_state, [chatbot, task_history], [chatbot], show_progress=True)
regen_btn.click(regenerate, [chatbot, task_history], [chatbot], show_progress=True)
addfile_btn.upload(add_file, [chatbot, task_history, addfile_btn], [chatbot, task_history], show_progress=True)
gr.Markdown("""\
<font size=2>Note: This demo is governed by the original license of Qwen2.5-VL. \
We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content, \
including hate speech, violence, pornography, deception, etc. \
(注:本演示受Qwen2.5-VL的许可协议限制。我们强烈建议,用户不应传播及不应允许他人传播以下内容,\
包括但不限于仇恨言论、暴力、色情、欺诈相关的有害信息�?""")
demo.queue().launch(
share=args.share,
inbrowser=args.inbrowser,
server_port=args.server_port,
server_name=args.server_name,
)
def main():
args = _get_args()
engine_args = AsyncEngineArgs(
model=args.checkpoint_path,
limit_mm_per_prompt={"image": args.max_image_nums, "video": 10},
dtype=args.dtype,
gpu_memory_utilization=args.gpu_memory_utilization,
enforce_eager=args.enforce_eager,
max_model_len=args.max_model_len,
tensor_parallel_size=args.tensor_parallel_size,
pipeline_parallel_size=args.pipeline_parallel_size
)
model = AsyncLLMEngine.from_engine_args(engine_args)
processor = AutoProcessor.from_pretrained(args.checkpoint_path)
_launch_demo(args, model, processor)
# 简单描述下这张图片
if __name__ == '__main__':
main()
在2台 2080ti机器,共16张显卡上部署Qwen2.5-VL-72B-Instruct模型, 显存共占用约149GB
关于bfloat16与float16的区别
bfloat16只支持Ampere架构,公司大多数旧卡2080ti, rtx titan都用不了. 于是特地在RTX 4060上测试这两个数据类型对显存的占用情况.
下图是在单机4张4060显卡上,分别使用两种类型部署Qwen2.5-VL-7B-Instruct的显存占用情况:
(27966-17938)/1024=9.8GB
bfloat16要比float16要整体增加约9.8G显存. 当显存资源紧张时, 建议使用float16类型.