写在前面:
本项目的代码原型基于yolov5+yolov8。其中检测模型使用的yolov5,跟踪模型使用的yolov8。
这里说明以下,为什么不整体都选择yolov8呢,v8无疑是比v5优秀的,但是atlas这块经过不断尝试没有过去,所以只能选择v5。那为什么跟踪模型选择yolov8呢,其实我这里要做的是实时视频的处理,我也不想使用deepsort那种带识别模型的笨重型跟踪框架,看了yolov8的代码,觉得相当可以,就选择了yolov8中的跟踪。原本我以为自己的水平是扣不出这块跟踪代码的,毕竟是网上大佬们经过多年迭代修改的代码,代码水平是远在我之上的。做好一件事情的最好方法,就是立刻开始做,在连续加班了2个晚上后,终于扣出来了,过程是曲折的,结果是美好的。一与一,勇者得强尔。
参考代码git链接:
Yolov5:https://github.com/ultralytics/yolov5.git (v6.1版本)
Yolov8:https://github.com/ultralytics/ultralytics.git
项目目的:
识别箕斗的状态,运行(run),静止(still),识别画面中箕斗数量(num)。
目前本文方法同时支持BoT-SORT/ByteTrack两种跟踪算法。
跟踪算法浅析:
BoT-SORT 算法:
BoT-SORT(Bottleneck Transformers for Multiple Object Tracking and Segmentation)是一种基于深度学习的多目标跟踪算法。
它的主要特点包括:
- 利用了 Transformer 架构的优势,能够对目标的特征进行有效的编码和关联。例如,在处理复杂场景中的目标时,能够捕捉到长距离的依赖关系,从而更准确地跟踪目标。
- 对目标的外观特征和运动特征进行融合。
通过结合外观信息和运动预测,提高了跟踪的准确性和稳定性。比如在目标被遮挡或短暂消失后重新出现时,能够更可靠地重新识别和跟踪。
ByteTrack 算法:
ByteTrack 是一种高效且准确的多目标跟踪算法。
其突出特点如下:
- 采用了一种简单而有效的关联策略。
它不仅仅依赖于高分检测框,还充分利用低分检测框中的信息,大大减少了目标丢失的情况。例如,在车辆密集的交通场景中,能够准确跟踪那些被部分遮挡的车辆。 - 具有较高的计算效率。
能够在保证跟踪效果的同时,降低计算成本,适用于实时应用场景。
区别:
- 准确性:
BoT-SORT 在 MOT17 和 MOT20 测试集的 MOTChallenge 数据集中排名第一,对于 MOT17 实现了 80.5 MOTA、80.2 IDF1 和 65.0 HOTA。而 ByteTrack 在速度达到30FPS(单张 V100)的情况下,各项指标也均有突破。相比 deep sort,ByteTrack 在遮挡情况下的提升非常明显。
- 速度:
ByteTrack 预测的速度感觉比 BoT-SORT 快一些,更加流畅。
- 其他指标:
BoT-SORT 可以很好地应对目标被遮挡或短暂消失后重新出现的情况,能够更可靠地重新识别和跟踪。而 ByteTrack 没有采用外表特征进行匹配,所以跟踪的效果非常依赖检测的效果,也就是说如果检测器的效果很好,跟踪也会取得不错的效果,但是如果检测的效果不好,那么会严重影响跟踪的效果。
数据集准备:
数据基于视频分解而成图片得到,基于labelimg标注,自己大概标了4天吧,一共872张。
Yolov5模型训练:
数据集目录格式如下,
data/jidou.yaml配置文件内容,
path: ./datasets/jidou # dataset root dir
train: images/train # train images (relative to 'path') 128 images
val: images/train # val images (relative to 'path') 128 images
test: images/train # test images (optional)
# Classes
nc: 1 # number of classes
names: ['jidou']
开始训练,
python3 train.py --img 640 --epochs 100 --data ./data/jidou.yaml --weights yolov5s.pt
模型转化,pt模型转化为onnx,
python export.py --weights ./jidou_model/best.pt –simplify
onnx模型转化为atlas模型,
atc --input_shape="images:1,3,640,640" --out_nodes="/model.24/Transpose:0;/model.24/Transpose_1:0;/model.24/Transpose_2:0" --output_type=FP32 --input_format=NCHW --output="./yolov5_add_bs1_fp16" --soc_version=Ascend310P3 --framework=5 --model="./best.onnx" --insert_op_conf=./insert_op.cfg
其中,fusion_result.json文件内容,
[{
"graph_fusion": {
"AConv2dMulFusion": {
"effect_times": "0",
"match_times": "57"
},
"ConstToAttrPass": {
"effect_times": "5",
"match_times": "5"
},
"ConvConcatFusionPass": {
"effect_times": "0",
"match_times": "13"
},
"ConvFormatRefreshFusionPass": {
"effect_times": "0",
"match_times": "60"
},
"ConvToFullyConnectionFusionPass": {
"effect_times": "0",
"match_times": "60"
},
"ConvWeightCompressFusionPass": {
"effect_times": "0",
"match_times": "60"
},
"CubeTransFixpipeFusionPass": {
"effect_times": "0",
"match_times": "3"
},
"FIXPIPEAPREQUANTFUSIONPASS": {
"effect_times": "0",
"match_times": "60"
},
"FIXPIPEFUSIONPASS": {
"effect_times": "0",
"match_times": "60"
},
"MulAddFusionPass": {
"effect_times": "0",
"match_times": "14"
},
"MulSquareFusionPass": {
"effect_times": "0",
"match_times": "57"
},
"RefreshInt64ToInt32FusionPass": {
"effect_times": "1",
"match_times": "1"
},
"RemoveCastFusionPass": {
"effect_times": "0",
"match_times": "123"
},
"ReshapeTransposeFusionPass": {
"effect_times": "0",
"match_times": "3"
},
"SplitConvConcatFusionPass": {
"effect_times": "0",
"match_times": "13"
},
"TransdataCastFusionPass": {
"effect_times": "0",
"match_times": "63"
},
"TransposedUpdateFusionPass": {
"effect_times": "3",
"match_times": "3"
},
"V200NotRequantFusionPass": {
"effect_times": "0",
"match_times": "7"
},
"ZConcatDFusionPass": {
"effect_times": "0",
"match_times": "13"
}
},
"session_and_graph_id": "0_0",
"ub_fusion": {
"AutomaticUbFusion": {
"effect_times": "1",
"match_times": "1",
"repository_hit_times": "0"
},
"TbeAippCommonFusionPass": {
"effect_times": "1",
"match_times": "1",
"repository_hit_times": "0"
},
"TbeConvSigmoidMulQuantFusionPass": {
"effect_times": "56",
"match_times": "56",
"repository_hit_times": "0"
}
}
}]
insert_op.cfg文件内容,
aipp_op {
aipp_mode : static
related_input_rank : 0
input_format : YUV420SP_U8
src_image_size_w : 640
src_image_size_h : 640
crop : false
csc_switch : true
rbuv_swap_switch : false
matrix_r0c0 : 256
matrix_r0c1 : 0
matrix_r0c2 : 359
matrix_r1c0 : 256
matrix_r1c1 : -88
matrix_r1c2 : -183
matrix_r2c0 : 256
matrix_r2c1 : 454
matrix_r2c2 : 0
input_bias_0 : 0
input_bias_1 : 128
input_bias_2 : 128
var_reci_chn_0 : 0.0039216
var_reci_chn_1 : 0.0039216
var_reci_chn_2 : 0.0039216
}
jidou.names文件内容,
jidou
yolov5_add_bs1_fp16.cfg文件内容,
CLASS_NUM=1
BIASES_NUM=18
BIASES=10,13,16,30,33,23,30,61,62,45,59,119,116,90,156,198,373,326
SCORE_THRESH=0.25
#SEPARATE_SCORE_THRESH=0.001,0.001,0.001,0.001,0.001,0.001,0.001,0.001,0.001,0.001
OBJECTNESS_THRESH=0.0
IOU_THRESH=0.5
YOLO_TYPE=3
ANCHOR_DIM=3
MODEL_TYPE=2
RESIZE_FLAG=0
YOLO_VERSION=5
代码编写之跟踪代码剥离:
剥离得整体思路如下,
- 先吧原始代码跑起来,效果测试是对的。
- 熟悉代码,主要熟悉trackers下面的py文件,engine中的predictor.py,results.py,model.py。
- 熟悉跟踪的本质,其实就是2个函数,一个初始化函数,一个update函数。
- 将模型和跟踪部分先剥离开(使用model.predict替换model.track)。
- 剥离Results结构体(使用传统的list替换Results得到更加通用的上下文传递变量)。
- 实现update函数(自己写代码替换tracker.update函数和predictor.results[i].update(**update_args)函数)。
- 剥离跟踪的配置文件yaml文件,选择在跟踪函数初始化赋值。
- 剥离其他依赖文件,metrics.py,ops.py。
- 剥离torch依赖,metrics.py中的batch_probiou函数基于numpy实现。
- 细节处bug修改。
最终跟踪代码track.py如下,
import os
import json
import cv2
import numpy as np
from plots import box_label, colors
from collections import defaultdict
from trackers.bot_sort import BOTSORT
from trackers.byte_tracker import BYTETracker
from names import names
class TRACK(object):
def __init__(self):
#跟踪
self.frame_rate=30
#BOTSORT
self.tracker = BOTSORT(frame_rate=self.frame_rate)
#BYTETracker
#self.tracker = BYTETracker(frame_rate=self.frame_rate)
self.track_history = defaultdict(lambda: [])
self.move_state = defaultdict(lambda: [])
self.move_state_dict = {0:"still" ,1:"run"}
self.distance = 5
def track(self, track_results, frame):
if len(track_results[0]["cls"]) != 0:
tracks = self.tracker.update(track_results[0], frame)
if len(tracks) != 0:
idx = tracks[:, -1].astype(int)
if track_results[0]["id"] is not None:
track_results[0]["id"] = np.array([track_results[0]["id"][i] for i in idx])
else:
track_results[0]["id"] = np.array(tracks[:, 4].astype(int))
track_results[0]["cls"] = np.array([track_results[0]["cls"][i] for i in idx])
track_results[0]["conf"] = np.array([track_results[0]["conf"][i] for i in idx])
track_results[0]["xywh"] = np.array([track_results[0]["xywh"][i] for i in idx])
#跟新track_history, move_state
boxes = track_results[0]["xywh"]
clses = track_results[0]["cls"]
track_ids = []
if track_results[0]["id"] is not None:
track_ids = track_results[0]["id"].tolist()
# Your code for processing track_ids
else:
print("No tracks found in this frame")
# Plot the tracks
for cls, box, track_id in zip(clses, boxes, track_ids):
x, y, w, h = box
track = self.track_history[track_id]
track.append((float(x+w/2.0), float(y+h/2.0))) # x, y center point
if len(track) > 30: # retain 90 tracks for 90 frames
track.pop(0)
if len(track)>=self.frame_rate:
if abs(track[-1][0]-track[0][0]) + abs(track[-1][1]-track[0][1])>= self.distance:
self.move_state[track_id] = self.move_state_dict[1]
else:
self.move_state[track_id] = self.move_state_dict[0]
else:
self.move_state[track_id] = self.move_state_dict[0]
return track_results
def draw(self, image, track_results):
# draw the result and save image
for index, info in enumerate(track_results[0]["xywh"]):
xyxy = [int(info[0]), int(info[1]), int(info[0])+int(info[2]), int(info[1])+int(info[3])]
classVec = int(track_results[0]["cls"][index])
conf = float(track_results[0]["conf"][index])
if track_results[0]["id"] is not None:
id = int(track_results[0]["id"][index])
else:
id = ""
if id =="":
label = f'{names[classVec]} {conf:.4f} track_id {id}'
else:
label = f'{names[classVec]} {conf:.4f} track_id {id} state {self.move_state[id]}'
annotated_frame = box_label(image, xyxy, label, color=colors[classVec])
cv2.putText(annotated_frame, "num:{}".format(len(track_results[0]["cls"])), (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 0, 0),thickness=2, lineType=cv2.LINE_AA)
boxes = track_results[0]["xywh"]
clses = track_results[0]["cls"]
track_ids = []
if track_results[0]["id"] is not None:
track_ids = track_results[0]["id"].tolist()
# Your code for processing track_ids
else:
print("No tracks found in this frame")
# Plot the tracks
for cls, box, track_id in zip(clses, boxes, track_ids):
x, y, w, h = box
track = self.track_history[track_id]
# Draw the tracking lines
points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
cv2.polylines(
annotated_frame,
[points],
isClosed=False,
color=colors[cls],
thickness=4,
)
return annotated_frame
metrics.py代码如下,
# Ultralytics YOLO 🚀, AGPL-3.0 license
"""Model validation metrics."""
import numpy as np
def bbox_ioa(box1, box2, iou=False, eps=1e-7):
"""
Calculate the intersection over box2 area given box1 and box2. Boxes are in x1y1x2y2 format.
Args:
box1 (np.ndarray): A numpy array of shape (n, 4) representing n bounding boxes.
box2 (np.ndarray): A numpy array of shape (m, 4) representing m bounding boxes.
iou (bool): Calculate the standard IoU if True else return inter_area/box2_area.
eps (float, optional): A small value to avoid division by zero. Defaults to 1e-7.
Returns:
(np.ndarray): A numpy array of shape (n, m) representing the intersection over box2 area.
"""
# Get the coordinates of bounding boxes
b1_x1, b1_y1, b1_x2, b1_y2 = box1.T
b2_x1, b2_y1, b2_x2, b2_y2 = box2.T
# Intersection area
inter_area = (np.minimum(b1_x2[:, None], b2_x2) - np.maximum(b1_x1[:, None], b2_x1)).clip(0) * (
np.minimum(b1_y2[:, None], b2_y2) - np.maximum(b1_y1[:, None], b2_y1)
).clip(0)
# Box2 area
area = (b2_x2 - b2_x1) * (b2_y2 - b2_y1)
if iou:
box1_area = (b1_x2 - b1_x1) * (b1_y2 - b1_y1)
area = area + box1_area[:, None] - inter_area
# Intersection over box2 area
return inter_area / (area + eps)
def batch_probiou(obb1, obb2, eps=1e-7):
"""
Calculate the prob IoU between oriented bounding boxes, https://arxiv.org/pdf/2106.06072v1.pdf.
Args:
obb1 ( np.ndarray): A tensor of shape (N, 5) representing ground truth obbs, with xywhr format.
obb2 ( np.ndarray): A tensor of shape (M, 5) representing predicted obbs, with xywhr format.
eps (float, optional): A small value to avoid division by zero. Defaults to 1e-7.
Returns:
(np.ndarray): A tensor of shape (N, M) representing obb similarities.
"""
x1, y1 = np.split(obb1[..., :2], 2, axis=-1)
x2, y2 = (x.squeeze(-1)[None] for x in np.split(obb2[..., :2],2, axis=-1))
a1, b1, c1 = _get_covariance_matrix(obb1)
a2, b2, c2 = (x.squeeze(-1)[None] for x in _get_covariance_matrix(obb2))
t1 = (
((a1 + a2) * np.power(y1 - y2, 2) + (b1 + b2) * np.power(x1 - x2, 2)) / ((a1 + a2) * (b1 + b2) - np.power(c1 + c2, 2) + eps)
) * 0.25
t2 = (((c1 + c2) * (x2 - x1) * (y1 - y2)) / ((a1 + a2) * (b1 + b2) - np.power(c1 + c2, 2) + eps)) * 0.5
t3 = np.log(
((a1 + a2) * (b1 + b2) - np.power(c1 + c2, 2))
/ (4 * np.clip(a1 * b1 - np.power(c1, 2),0, np.inf) * np.sqrt(np.clip(a2 * b2 - np.power(c2, 2), 0, np.inf)) + eps)
+ eps
) * 0.5
bd = np.clip(t1 + t2 + t3, eps, 100.0)
hd = np.sqrt(1.0 - np.exp(-bd) + eps)
return 1 - hd
def _get_covariance_matrix(boxes):
"""
Generating covariance matrix from obbs.
Args:
boxes (np.ndarray): A tensor of shape (N, 5) representing rotated bounding boxes, with xywhr format.
Returns:
(np.ndarray): Covariance metrixs corresponding to original rotated bounding boxes.
"""
# Gaussian bounding boxes, ignore the center points (the first two columns) because they are not needed here.
gbbs = np.concatenate((np.power(boxes[:, 2:4],2) / 12, boxes[:, 4:]), axis=-1)
a, b, c = np.split(gbbs, 3, axis=-1)
cos = np.cos(c)
sin = np.sin(c)
cos2 = np.power(cos, 2)
sin2 = np.power(sin, 2)
return a * cos2 + b * sin2, a * sin2 + b * cos2, (a - b) * cos * sin
代码编写之检测代码yolov5.py实现:
import os
import json
import cv2
from StreamManagerApi import StreamManagerApi, MxDataInput
import numpy as np
from plots import box_label, colors
from utils import scale_coords, xyxy2xywh, is_legal, preproc
from track import TRACK
from names import names
import time
class YOLOV5(object):
def __init__(self):
# init stream manager
self.streamManagerApi = StreamManagerApi()
ret = self.streamManagerApi.InitManager()
if ret != 0:
print("Failed to init Stream manager, ret=%s" % str(ret))
exit()
# create streams by pipeline config file
with open("./pipeline/jidou.pipeline", 'rb') as f:
pipelineStr = f.read()
ret = self.streamManagerApi.CreateMultipleStreams(pipelineStr)
if ret != 0:
print("Failed to create Stream, ret=%s" % str(ret))
exit()
def process(self, image):
# Construct the input of the stream
dataInput = MxDataInput()
h0, w0 = image.shape[:2]
r = 640 / max(h0, w0) # ratio
input_shape = (640, 640)
pre_img = preproc(image, input_shape)[0]
pre_img = np.ascontiguousarray(pre_img)
image_bytes = cv2.imencode('.jpg', pre_img)[1].tobytes()
dataInput.data = image_bytes
# Inputs data to a specified stream based on streamName.
STREAMNAME = b'classification+detection'
INPLUGINID = 0
uniqueId = self.streamManagerApi.SendDataWithUniqueId(STREAMNAME, INPLUGINID, dataInput)
if uniqueId < 0:
print("Failed to send data to stream.")
exit()
# Obtain the inference result by specifying streamName and uniqueId.
inferResult = self.streamManagerApi.GetResultWithUniqueId(STREAMNAME, uniqueId, 10000)
if inferResult.errorCode != 0:
print("GetResultWithUniqueId error. errorCode=%d, errorMsg=%s" % (
inferResult.errorCode, inferResult.data.decode()))
exit()
results = json.loads(inferResult.data.decode())
track_results = [{"id":None, "className":[],"cls":[],"conf":[], "xywh":[]}]
for num, info in enumerate(results['MxpiObject']):
xyxy = [int(info['x0']), int(info['y0']), int(info['x1']), int(info['y1'])]
xyxy = scale_coords(pre_img.shape[:2], np.array(xyxy), image.shape[:2])
classVec = info["classVec"]
track_results[0]["className"].append(names[classVec[0]["classId"]])
track_results[0]["cls"].append(classVec[0]["classId"])
track_results[0]["conf"].append(classVec[0]["confidence"])
track_results[0]["xywh"].append([xyxy[0], xyxy[1], xyxy[2]-xyxy[0], xyxy[3]-xyxy[1]])
track_results[0]["cls"] = np.array(track_results[0]["cls"])
track_results[0]["conf"] = np.array(track_results[0]["conf"])
track_results[0]["xywh"] = np.array(track_results[0]["xywh"])
return track_results
def __del__(self):
# destroy streams
self.streamManagerApi.DestroyAllStreams()
def draw(self, image, track_results):
# draw the result and save image
for index, info in enumerate(track_results[0]["xywh"]):
xyxy = [int(info[0]), int(info[1]), int(info[0])+int(info[2]), int(info[1])+int(info[3])]
classVec = int(track_results[0]["cls"][index])
conf = float(track_results[0]["conf"][index])
if track_results[0]["id"] is not None:
id = int(track_results[0]["id"][index])
else:
id = ""
label = f'{names[classVec]} {conf:.4f}'
annotated_frame = box_label(image, xyxy, label, color=colors[classVec])
return annotated_frame
def test_img():
# read image
ORI_IMG_PATH = "./test_images/00004.jpg"
image = cv2.imread(ORI_IMG_PATH, 1)
yolov5 = YOLOV5()
track_results = yolov5.process(image)
print(track_results)
save_img = yolov5.draw(image, track_results)
cv2.imwrite('./result.jpg', save_img)
def test_video():
yolov5 = YOLOV5()
tracker = TRACK()
# Open the video file
video_path = "./test_images/jidou.mp4"
cap = cv2.VideoCapture(video_path)
fourcc = cv2.VideoWriter_fourcc('X', 'V', 'I', 'D') # 确定视频被保存后的编码格式
output = cv2.VideoWriter("output.mp4", fourcc, 20, (1280, 720)) # 创建VideoWriter类对象
# Loop through the video frames
while cap.isOpened():
# Read a frame from the video
success, frame = cap.read()
if success:
# Run YOLOv8 tracking on the frame, persisting tracks between frames
t1 = time.time()
track_results = yolov5.process(frame)
t2 = time.time()
track_results = tracker.track(track_results, frame)
t3 = time.time()
annotated_frame = tracker.draw(frame, track_results)
t4 = time.time()
print("time", t2-t1, t3-t2, t4-t3, t4-t1)
output.write(annotated_frame)
# Display the annotated frame
#cv2.imshow("YOLOv8 Tracking", annotated_frame)
# Break the loop if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord("q"):
break
else:
# Break the loop if the end of the video is reached
break
# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
#test_img()
test_video()
最终整体代码目录结构:
最终效果: