目录
- 实现讲解
- 效果展示
- 完整代码
本文作为上篇博客的延续,在之前实现了图片推理的基础上,进一步介绍如何进行视频推理。
实现讲解
首先,我们需要对之前的 predict_and_show_image 函数进行拆分,将图像显示与推理器(predictor)的初始化解耦。这样做的目的是在进行视频推理前,能够先完成推理器的初始化、模型配置的设置,以及模型与 OpenVINO 优化模型的绑定,从而确保推理过程能够充分利用 OpenVINO 提供的高性能推理引擎。
以下是修改后的代码示例:
def initialize_predictor(det_model: YOLO, compiled_model: ov.CompiledModel, conf=0.25, batch=1, save=False, mode="predict"):
"""
初始化 det_model 的 predictor,如果尚未初始化,并设置 OpenVINO 编译好的模型。
参数:
det_model: 模型对象,包含 predictor, model, overrides 等属性。
compiled_model: OpenVINO 编译好的模型对象。
conf: 置信度阈值,默认 0.25。
batch: 批处理大小,默认 1。
save: 是否保存结果,默认 False。
mode: 推理模式,默认 "predict"。
"""
if det_model.predictor is None:
config = {"conf": conf, "batch": batch, "save": save, "mode": mode}
args = {**det_model.overrides, **config}
det_model.predictor = det_model._smart_load("predictor")(
overrides=args,
_callbacks=det_model.callbacks
)
det_model.predictor.setup_model(model=det_model.model)
det_model.predictor.model.ov_compiled_model = compiled_model
def predict_and_show_image(det_model: YOLO, compiled_model: ov.CompiledModel):
"""
使用模型对图像进行目标检测,并通过 Tkinter GUI 显示检测结果
"""
results = det_model(IMAGE_PATH)
result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
root = tk.Tk()
root.title("YOLOv11 (OpenVINO INT8) Detection Result")
tk_img = ImageTk.PhotoImage(result_img)
label = tk.Label(root, image=tk_img)
label.pack()
root.mainloop()
接下来,我们需要实现一个函数,用于结合 YOLO 和 OpenVINO 实现对视频的实时目标检测。该函数支持从视频文件或摄像头读取图像帧,并在完成推理处理后,实时显示检测结果、推理耗时和帧率(FPS)。此外,检测结果还可以保存为带有边框标注和帧率信息的视频文件,同时支持将检测信息(如类别、时间戳等)记录为日志文件,便于后续分析。
以下是具体的实现代码:
def run_video_inference(det_model, source=0, flip=True, width=None,
save_video=True, save_log=True,
output_dir="results"):
"""
使用 OpenVINO 和 YOLO 模型对视频进行目标检测推理并可视化。
参数:
det_model:YOLO 推理模型,支持 __call__(image) 方式调用
source:视频源,可以是文件路径或摄像头索引, 0 表示默认摄像头
flip:是否镜像翻转视频(镜像)
width:设置画面宽度(自动等比例缩放), None 表示保持原尺寸
save_video: 是否保存检测后的视频(MP4)
save_log: 是否记录检测日志
output_dir: 输出文件目录
"""
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"无法打开视频源: {source}")
return
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 视频保存设置(保存为 MP4)
out_writer = None
output_path = str(Path(output_dir) / "output_yolo.mp4")
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
# 日志文件准备
log_file = open(Path(output_dir) / "detection_log.txt", "w") if save_log else None
cv2.namedWindow("YOLO OpenVINO Detection", cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
processing_times = collections.deque()
try:
while True:
ret, frame = cap.read()
if not ret:
print("视频读取失败或结束。")
break
# 检测窗口是否关闭
if cv2.getWindowProperty("YOLO OpenVINO Detection", cv2.WND_PROP_VISIBLE) < 1:
print("窗口已关闭,退出推理。")
break
# 镜像翻转(常用于摄像头)
if flip:
frame = cv2.flip(frame, 1)
# 等比例缩放画面到指定宽度
if width:
scale = width / max(frame.shape)
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
input_image = np.array(frame)
# 计时推理开始
start_time = time.time()
results = det_model(input_image, verbose=False)
stop_time = time.time()
# 保存日志
if log_file:
for result in results:
for box in result.boxes:
cls_id = int(box.cls)
conf = float(box.conf)
log_file.write(f"time: {stop_time:.3f}, class_id: {cls_id}, confidence: {conf:.2f}\n")
# 绘制检测结果
frame = results[0].plot()
# 推理时间统计,用于计算平均推理时间和 FPS
processing_times.append(stop_time - start_time)
if len(processing_times) > 200:
processing_times.popleft()
avg_time = np.mean(processing_times) * 1000
fps = 1000 / avg_time
# 显示推理时间和 FPS
_, f_width = frame.shape[:2]
cv2.putText(
frame,
f"Inference: {avg_time:.1f}ms ({fps:.1f} FPS)",
(20, 40),
cv2.FONT_HERSHEY_COMPLEX,
f_width / 1000,
(0, 0, 255),
1,
cv2.LINE_AA
)
# 初始化 MP4 写入器
if save_video and out_writer is None:
height, width_out = frame.shape[:2]
out_writer = cv2.VideoWriter(output_path, fourcc, 25.0, (width_out, height))
if save_video and out_writer:
out_writer.write(frame)
cv2.imshow("YOLO OpenVINO Detection", frame)
# ESC 键退出
if cv2.waitKey(1) == 27:
print("按下 ESC 键,退出。")
break
except KeyboardInterrupt:
print("用户中断。")
except RuntimeError as e:
print("运行错误:", e)
finally:
cap.release()
cv2.destroyAllWindows()
if out_writer:
out_writer.release()
if log_file:
log_file.close()
print(f"检测视频保存至: {output_path}")
if save_log:
print(f"检测日志保存至: {Path(output_dir) / 'detection_log.txt'}")
最后就是调用这个函数的部分了,下面是具体的示例代码:
# 5. 初始化 predictor
initialize_predictor(det_model, compiled_int8_model)
# 6. 执行视频推理
run_video_inference(
det_model=det_model, # 模型对象
source="./1.mp4", # 视频路径或摄像头编号
flip=False, # 是否镜像
width=1280, # 画面宽度
save_video=True, # 是否保存 MP4 视频
save_log=True, # 是否保存检测日志
output_dir="./results" # 输出目录
)
效果展示
完整代码
from pathlib import Path
from zipfile import ZipFile
from typing import Dict
import urllib.request
import tkinter as tk
from PIL import Image, ImageTk
from ultralytics import YOLO
from ultralytics.utils import DEFAULT_CFG
from ultralytics.cfg import get_cfg
from ultralytics.data.converter import coco80_to_coco91_class
from ultralytics.data.utils import check_det_dataset
import openvino as ov
import nncf
import cv2
import numpy as np
import collections
import time
# ----------------------------- #
# 全局配置和路径定义
# ----------------------------- #
MODEL_VARIANTS = ["yolo11n", "yolo11s", "yolo11m", "yolo11l", "yolo11x"]
MODEL_NAME = MODEL_VARIANTS[0] # 默认使用最轻量的 yolo11n 模型
PT_MODEL_PATH = f"{MODEL_NAME}.pt"
IR_MODEL_DIR = Path(f"{MODEL_NAME}_openvino_model")
IR_MODEL_PATH = IR_MODEL_DIR / f"{MODEL_NAME}.xml"
INT8_MODEL_PATH = Path(f"{MODEL_NAME}_openvino_int8_model/{MODEL_NAME}.xml")
IMAGE_PATH = Path("./coco_bike.jpg")
OUT_DIR = Path("./")
# COCO 数据集资源路径
DATA_URL = "http://images.cocodataset.org/zips/val2017.zip"
LABELS_URL = "https://github.com/ultralytics/yolov5/releases/download/v1.0/coco2017labels-segments.zip"
CFG_URL = "https://raw.githubusercontent.com/ultralytics/ultralytics/v8.1.0/ultralytics/cfg/datasets/coco.yaml"
DATA_PATH = OUT_DIR / "val2017.zip"
LABELS_PATH = OUT_DIR / "coco2017labels-segments.zip"
CFG_PATH = OUT_DIR / "coco.yaml"
# ----------------------------- #
# 工具函数模块
# ----------------------------- #
def download_file_if_needed(url: str, filename: str, dest_dir: Path) -> Path:
"""
下载文件(若文件已存在则跳过)
"""
dest_dir.mkdir(parents=True, exist_ok=True)
file_path = dest_dir / filename
if not file_path.exists():
print(f"Downloading: {filename}")
urllib.request.urlretrieve(url, file_path)
else:
print(f"File already exists: {file_path}")
return file_path
def prepare_test_image():
"""
确保测试图片存在,如无则从官方地址下载
"""
if not IMAGE_PATH.exists():
download_file_if_needed(
"https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/image/coco_bike.jpg",
IMAGE_PATH.name, IMAGE_PATH.parent
)
def load_or_export_openvino_model() -> ov.CompiledModel:
"""
加载或导出 YOLOv11 OpenVINO IR 模型,并编译为 CPU 运行时模型
"""
model = YOLO(PT_MODEL_PATH).to("cpu")
if not IR_MODEL_PATH.exists():
model.export(format="openvino", dynamic=True, half=True)
core = ov.Core()
ir_model = core.read_model(IR_MODEL_PATH)
return core.compile_model(ir_model, "CPU")
def build_ultralytics_model() -> YOLO:
"""
创建 Ultralytics 的 YOLO 模型接口,用于调用预测器
"""
return YOLO(IR_MODEL_DIR, task="detect")
def prepare_dataset() -> 'tuple[nncf.Dataset, object]':
"""
下载并解压 COCO 数据集,构造验证器和 NNCF 所需数据集格式
"""
if not (OUT_DIR / "coco/labels").exists():
download_file_if_needed(DATA_URL, DATA_PATH.name, DATA_PATH.parent)
download_file_if_needed(LABELS_URL, LABELS_PATH.name, LABELS_PATH.parent)
download_file_if_needed(CFG_URL, CFG_PATH.name, CFG_PATH.parent)
with ZipFile(LABELS_PATH, "r") as z:
z.extractall(OUT_DIR)
with ZipFile(DATA_PATH, "r") as z:
z.extractall(OUT_DIR / "coco/images")
args = get_cfg(cfg=DEFAULT_CFG)
args.data = str(CFG_PATH)
# 用 ultralytics 的 validator 构建 dataset
det_model = build_ultralytics_model();
validator_cls = det_model.task_map[det_model.task]["validator"]
validator = validator_cls(args=args)
validator.data = check_det_dataset(args.data)
validator.stride = 32
dataloader = validator.get_dataloader(OUT_DIR / "coco", 1)
validator.class_map = coco80_to_coco91_class()
validator.names = YOLO(PT_MODEL_PATH).to("cpu").model.names
validator.nc = 80
def transform_fn(data: Dict):
return validator.preprocess(data)['img'].numpy()
return nncf.Dataset(dataloader, transform_fn), validator
def quantize_model(original_model: ov.Model, quant_dataset: nncf.Dataset) -> ov.Model:
"""
使用 NNCF 对 OpenVINO 模型进行混合精度量化(混合 INT8/F32)
"""
ignored_scope = nncf.IgnoredScope(
subgraphs=[
nncf.Subgraph(
inputs=[f"__module.model.{22 if 'v8' in MODEL_NAME else 23}/aten::cat/Concat",
f"__module.model.{22 if 'v8' in MODEL_NAME else 23}/aten::cat/Concat_1",
f"__module.model.{22 if 'v8' in MODEL_NAME else 23}/aten::cat/Concat_2"],
outputs=[f"__module.model.{22 if 'v8' in MODEL_NAME else 23}/aten::cat/Concat_7"]
)
]
)
quant_model = nncf.quantize(
original_model,
quant_dataset,
preset=nncf.QuantizationPreset.MIXED,
ignored_scope=ignored_scope
)
ov.save_model(quant_model, str(INT8_MODEL_PATH))
print(f"Quantized model saved to: {INT8_MODEL_PATH}")
return quant_model
def initialize_predictor(det_model: YOLO, compiled_model: ov.CompiledModel, conf=0.25, batch=1, save=False, mode="predict"):
"""
初始化 det_model 的 predictor,如果尚未初始化,并设置 OpenVINO 编译好的模型。
参数:
det_model: 模型对象,包含 predictor, model, overrides 等属性。
compiled_model: OpenVINO 编译好的模型对象。
conf: 置信度阈值,默认 0.25。
batch: 批处理大小,默认 1。
save: 是否保存结果,默认 False。
mode: 推理模式,默认 "predict"。
"""
if det_model.predictor is None:
config = {"conf": conf, "batch": batch, "save": save, "mode": mode}
args = {**det_model.overrides, **config}
det_model.predictor = det_model._smart_load("predictor")(
overrides=args,
_callbacks=det_model.callbacks
)
det_model.predictor.setup_model(model=det_model.model)
det_model.predictor.model.ov_compiled_model = compiled_model
def predict_and_show_image(det_model: YOLO, compiled_model: ov.CompiledModel):
"""
使用模型对图像进行目标检测,并通过 Tkinter GUI 显示检测结果
"""
results = det_model(IMAGE_PATH)
result_img = Image.fromarray(results[0].plot()[:, :, ::-1])
root = tk.Tk()
root.title("YOLOv11 (OpenVINO INT8) Detection Result")
tk_img = ImageTk.PhotoImage(result_img)
label = tk.Label(root, image=tk_img)
label.pack()
root.mainloop()
def run_video_inference(det_model, source=0, flip=True, width=None,
save_video=True, save_log=True,
output_dir="results"):
"""
使用 OpenVINO 和 YOLO 模型对视频进行目标检测推理并可视化。
参数:
det_model:YOLO 推理模型,支持 __call__(image) 方式调用
source:视频源,可以是文件路径或摄像头索引, 0 表示默认摄像头
flip:是否镜像翻转视频(镜像)
width:设置画面宽度(自动等比例缩放), None 表示保持原尺寸
save_video: 是否保存检测后的视频(MP4)
save_log: 是否记录检测日志
output_dir: 输出文件目录
"""
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"无法打开视频源: {source}")
return
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 视频保存设置(保存为 MP4)
out_writer = None
output_path = str(Path(output_dir) / "output_yolo.mp4")
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
# 日志文件准备
log_file = open(Path(output_dir) / "detection_log.txt", "w") if save_log else None
cv2.namedWindow("YOLO OpenVINO Detection", cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
processing_times = collections.deque()
try:
while True:
ret, frame = cap.read()
if not ret:
print("视频读取失败或结束。")
break
# 检测窗口是否关闭
if cv2.getWindowProperty("YOLO OpenVINO Detection", cv2.WND_PROP_VISIBLE) < 1:
print("窗口已关闭,退出推理。")
break
# 镜像翻转(常用于摄像头)
if flip:
frame = cv2.flip(frame, 1)
# 等比例缩放画面到指定宽度
if width:
scale = width / max(frame.shape)
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
input_image = np.array(frame)
# 计时推理开始
start_time = time.time()
results = det_model(input_image, verbose=False)
stop_time = time.time()
# 保存日志
if log_file:
for result in results:
for box in result.boxes:
cls_id = int(box.cls)
conf = float(box.conf)
log_file.write(f"time: {stop_time:.3f}, class_id: {cls_id}, confidence: {conf:.2f}\n")
# 绘制检测结果
frame = results[0].plot()
# 推理时间统计,用于计算平均推理时间和 FPS
processing_times.append(stop_time - start_time)
if len(processing_times) > 200:
processing_times.popleft()
avg_time = np.mean(processing_times) * 1000
fps = 1000 / avg_time
# 显示推理时间和 FPS
_, f_width = frame.shape[:2]
cv2.putText(
frame,
f"Inference: {avg_time:.1f}ms ({fps:.1f} FPS)",
(20, 40),
cv2.FONT_HERSHEY_COMPLEX,
f_width / 1000,
(0, 0, 255),
1,
cv2.LINE_AA
)
# 初始化 MP4 写入器
if save_video and out_writer is None:
height, width_out = frame.shape[:2]
out_writer = cv2.VideoWriter(output_path, fourcc, 25.0, (width_out, height))
if save_video and out_writer:
out_writer.write(frame)
cv2.imshow("YOLO OpenVINO Detection", frame)
# ESC 键退出
if cv2.waitKey(1) == 27:
print("按下 ESC 键,退出。")
break
except KeyboardInterrupt:
print("用户中断。")
except RuntimeError as e:
print("运行错误:", e)
finally:
cap.release()
cv2.destroyAllWindows()
if out_writer:
out_writer.release()
if log_file:
log_file.close()
print(f"检测视频保存至: {output_path}")
if save_log:
print(f"检测日志保存至: {Path(output_dir) / 'detection_log.txt'}")
# ----------------------------- #
# 主执行流程
# ----------------------------- #
def main():
# 1. 准备测试图像(如无则下载)
prepare_test_image()
# 2. 加载或导出 OpenVINO IR 模型,并编译运行(用于量化或预测)
compiled_fp_model = load_or_export_openvino_model()
# 3. 构造 Ultralytics YOLO 接口,用于推理/验证
det_model = build_ultralytics_model()
# 4. 若 INT8 模型已存在,则直接加载;否则进行量化生成
core = ov.Core()
if INT8_MODEL_PATH.exists():
quantized_model = core.read_model(INT8_MODEL_PATH)
compiled_int8_model = core.compile_model(quantized_model, "CPU")
else:
quant_dataset, _ = prepare_dataset()
quantized_model = quantize_model(core.read_model(IR_MODEL_PATH), quant_dataset)
compiled_int8_model = core.compile_model(quantized_model, "CPU")
# 5. 初始化 predictor
initialize_predictor(det_model, compiled_int8_model)
# 6. 执行视频推理
run_video_inference(
det_model=det_model, # 模型对象
source="./1.mp4", # 视频路径或摄像头编号
flip=False, # 是否镜像
width=1280, # 画面宽度
save_video=True, # 是否保存 MP4 视频
save_log=True, # 是否保存检测日志
output_dir="./results" # 输出目录
)
if __name__ == "__main__":
main()