简单串行执行
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch, time, threading
def llm(
model_path,
prompt=None,
image=None,
video=None,
images=None,
videos=None,
max_new_tokens=2048,
temperature=0.6,
):
"""
model_path: 模型路径
prompt: 文本prompt
image: 单张图片路径
video: 单个视频路径
images: 图片路径列表
videos: 视频路径列表
max_new_tokens: 最大生成token数
temperature: 采样温度
"""
old_time = time.time()
gpu_memories = []
def monitor_gpu_memory():
import torch
while not getattr(monitor_gpu_memory, 'stop', False):
mem = torch.cuda.memory_allocated() / 1024 / 1024
gpu_memories.append(mem)
time.sleep(5)
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
attn_implementation="flash_attention_2",
device_map="cuda"
)
monitor_gpu_memory.stop = False
mem_thread = threading.Thread(target=monitor_gpu_memory)
mem_thread.start()
processor = AutoProcessor.from_pretrained(model_path, use_fast=True)
contents = []
if prompt is not None:
contents.append({"type": "text", "text": prompt})
if images is not None:
for img in images:
contents.append({"type": "image", "image": img})
elif image is not None:
contents.append({"type": "image", "image": image})
elif videos is not None:
for vid in videos:
contents.append({"type": "video", "video": vid})
elif video is not None:
contents.append({"type": "video", "video": video})
messages = [{"role": "user", "content": contents}]
text = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
generated_ids = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature
)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
monitor_gpu_memory.stop = True
mem_thread.join()
total_time = time.time() - old_time
total_tokens = sum(len(ids) for ids in generated_ids_trimmed)
speed = total_tokens / total_time if total_time > 0 else 0
avg_mem = sum(gpu_memories) / len(gpu_memories) if gpu_memories else 0
return {
"output_text": output_text,
"total_time": total_time,
"total_tokens": total_tokens,
"speed": speed,
"avg_mem": avg_mem,
}
if __name__ == "__main__":
model_path = "/mnt/d/LLaMA-Factory/Qwen/Qwen2.5-VL-7B-Instruct-unsloth-bnb-4bit"
result = llm(
model_path=model_path,
prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",
image=r"/mnt/c/Users/CJK/Desktop/3.png",
max_new_tokens=2048,
temperature=1.0
)
print(result["output_text"])
print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")
result = llm(
model_path=model_path,
prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",
images=[r"/mnt/c/Users/CJK/Desktop/1.png", r"/mnt/c/Users/CJK/Desktop/3.png"],
max_new_tokens=2048,
temperature=0.6
)
print(result["output_text"])
print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")
result = llm(
model_path=model_path,
prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",
video=r"/mnt/c/Users/CJK/Desktop/2.mp4",
max_new_tokens=2048,
temperature=0.6
)
print(result["output_text"])
print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")
print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")
result = llm(
model_path=model_path,
prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",
videos=[r"/mnt/c/Users/CJK/Desktop/1.mp4", r"/mnt/c/Users/CJK/Desktop/2.mp4"],
max_new_tokens=2048,
temperature=0.6
)
print(result["output_text"])
print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")
异步/并行执行
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch, time, threading
import concurrent.futures
from typing import List, Dict, Union, Optional, Any
_MODEL = None
_PROCESSOR = None
_MODEL_LOCK = threading.Lock()
def load_model_and_processor(model_path):
"""加载模型和处理器,如果已经加载则返回缓存的实例"""
global _MODEL, _PROCESSOR
with _MODEL_LOCK:
if _MODEL is None or _PROCESSOR is None:
_MODEL = Qwen2_5_VLForConditionalGeneration.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
attn_implementation="flash_attention_2",
device_map="cuda"
)
_PROCESSOR = AutoProcessor.from_pretrained(model_path, use_fast=True)
return _MODEL, _PROCESSOR
def llm(
model_path,
prompt=None,
image=None,
video=None,
images=None,
videos=None,
max_new_tokens=2048,
temperature=0.6,
parallel=False,
max_workers=4,
):
"""
model_path: 模型路径
prompt: 文本prompt
image: 单张图片路径
video: 单个视频路径
images: 图片路径列表
videos: 视频路径列表
max_new_tokens: 最大生成token数
temperature: 采样温度
parallel: 是否并行处理多个图片/视频
max_workers: 并行处理的最大工作线程数
"""
if parallel and ((images and len(images) > 1) or (videos and len(videos) > 1)):
return parallel_process(
model_path=model_path,
prompt=prompt,
images=images,
videos=videos,
max_new_tokens=max_new_tokens,
temperature=temperature,
max_workers=max_workers
)
old_time = time.time()
gpu_memories = []
def monitor_gpu_memory():
import torch
while not getattr(monitor_gpu_memory, 'stop', False):
mem = torch.cuda.memory_allocated() / 1024 / 1024
gpu_memories.append(mem)
time.sleep(5)
model, processor = load_model_and_processor(model_path)
monitor_gpu_memory.stop = False
mem_thread = threading.Thread(target=monitor_gpu_memory)
mem_thread.start()
contents = []
if prompt is not None:
contents.append({"type": "text", "text": prompt})
if images is not None:
for img in images:
contents.append({"type": "image", "image": img})
elif image is not None:
contents.append({"type": "image", "image": image})
elif videos is not None:
for vid in videos:
contents.append({"type": "video", "video": vid})
elif video is not None:
contents.append({"type": "video", "video": video})
messages = [{"role": "user", "content": contents}]
text = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
generated_ids = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature
)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
monitor_gpu_memory.stop = True
mem_thread.join()
total_time = time.time() - old_time
total_tokens = sum(len(ids) for ids in generated_ids_trimmed)
speed = total_tokens / total_time if total_time > 0 else 0
avg_mem = sum(gpu_memories) / len(gpu_memories) if gpu_memories else 0
return {
"output_text": output_text,
"total_time": total_time,
"total_tokens": total_tokens,
"speed": speed,
"avg_mem": avg_mem,
}
def process_single_item(
model_path: str,
prompt: Optional[str],
item_path: str,
is_video: bool = False,
max_new_tokens: int = 2048,
temperature: float = 0.6,
) -> Dict[str, Any]:
"""处理单个图片或视频"""
model, processor = load_model_and_processor(model_path)
old_time = time.time()
gpu_memories = []
def monitor_gpu_memory():
import torch
while not getattr(monitor_gpu_memory, 'stop', False):
mem = torch.cuda.memory_allocated() / 1024 / 1024
gpu_memories.append(mem)
time.sleep(5)
monitor_gpu_memory.stop = False
mem_thread = threading.Thread(target=monitor_gpu_memory)
mem_thread.start()
contents = []
if prompt is not None:
contents.append({"type": "text", "text": prompt})
if is_video:
contents.append({"type": "video", "video": item_path})
else:
contents.append({"type": "image", "image": item_path})
messages = [{"role": "user", "content": contents}]
text = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to(model.device)
with _MODEL_LOCK:
generated_ids = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature
)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
monitor_gpu_memory.stop = True
mem_thread.join()
total_time = time.time() - old_time
total_tokens = sum(len(ids) for ids in generated_ids_trimmed)
speed = total_tokens / total_time if total_time > 0 else 0
avg_mem = sum(gpu_memories) / len(gpu_memories) if gpu_memories else 0
return {
"output_text": output_text,
"total_time": total_time,
"total_tokens": total_tokens,
"speed": speed,
"avg_mem": avg_mem,
}
def parallel_process(
model_path: str,
prompt: Optional[str] = None,
images: Optional[List[str]] = None,
videos: Optional[List[str]] = None,
max_new_tokens: int = 2048,
temperature: float = 0.6,
max_workers: int = 4,
) -> Dict[str, Any]:
"""并行处理多个图片或视频"""
start_time = time.time()
results = []
load_model_and_processor(model_path)
items = []
is_video_flags = []
if images:
items.extend(images)
is_video_flags.extend([False] * len(images))
if videos:
items.extend(videos)
is_video_flags.extend([True] * len(videos))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_item = {
executor.submit(
process_single_item,
model_path,
prompt,
item,
is_video,
max_new_tokens,
temperature
): (item, is_video)
for item, is_video in zip(items, is_video_flags)
}
for future in concurrent.futures.as_completed(future_to_item):
item, is_video = future_to_item[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"处理 {'视频' if is_video else '图片'} {item} 时出错: {e}")
all_output_texts = [result["output_text"] for result in results]
total_time = time.time() - start_time
total_tokens = sum(result["total_tokens"] for result in results)
avg_speed = total_tokens / total_time if total_time > 0 else 0
avg_mem = sum(result.get("avg_mem", 0) for result in results) / len(results) if results else 0
return {
"output_text": all_output_texts,
"total_time": total_time,
"total_tokens": total_tokens,
"speed": avg_speed,
"avg_mem": avg_mem,
"individual_results": results
}
if __name__ == "__main__":
model_path = "/mnt/d/LLaMA-Factory/Qwen/Qwen2.5-VL-7B-Instruct-unsloth-bnb-4bit"
result = llm(
model_path=model_path,
prompt="识别图中文字,如果有表格等特殊格式需要保留原格式。不用解释和总结,直接输出识别结果。",
images=[r"/mnt/c/Users/CJK/Desktop/1.png", r"/mnt/c/Users/CJK/Desktop/2.png", r"/mnt/c/Users/CJK/Desktop/3.png", r"/mnt/c/Users/CJK/Desktop/4.png", r"/mnt/c/Users/CJK/Desktop/5.png", r"/mnt/c/Users/CJK/Desktop/6.png", r"/mnt/c/Users/CJK/Desktop/7.png", r"/mnt/c/Users/CJK/Desktop/8.png"],
max_new_tokens=2048,
temperature=0.6,
parallel=True,
max_workers=8
)
print("并行处理结果:")
for i, text in enumerate(result["output_text"]):
print(f"图片 {i+1} 结果: {text}")
print(f"总耗时: {result['total_time']:.2f}s, 生成token数: {result['total_tokens']}, 平均输出速度: {result['speed']:.2f} token/s, 平均占用显存: {result['avg_mem']:.2f} MB")