SORT 是一种简单的在线实时多目标跟踪算法。要点为:
(1)以 IoU 作为前后帧间目标关系度量指标;
(2)利用卡尔曼滤波器预测当前位置;
(3)通过匈牙利算法关联检测框到目标;
(4)应用试探期甄别虚检。
python工程实现:(采用yolov5作为检测器,onnx模型推理)
yolov5.py
import cv2
import numpy as np
import onnxruntime
CLASSES=['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear',
'hair drier', 'toothbrush'] #coco80类别
use_letterbox = True
input_shape = (640,640)
class YOLOV5():
def __init__(self,onnxpath):
self.onnx_session=onnxruntime.InferenceSession(onnxpath,providers=['CPUExecutionProvider', 'CUDAExecutionProvider'])
self.input_name=self.get_input_name()
self.output_name=self.get_output_name()
#-------------------------------------------------------
# 获取输入输出的名字
#-------------------------------------------------------
def get_input_name(self):
input_name=[]
for node in self.onnx_session.get_inputs():
input_name.append(node.name)
return input_name
def get_output_name(self):
output_name=[]
for node in self.onnx_session.get_outputs():
output_name.append(node.name)
return output_name
#-------------------------------------------------------
# 输入图像
#-------------------------------------------------------
def get_input_feed(self,img_tensor):
input_feed={}
for name in self.input_name:
input_feed[name]=img_tensor
return input_feed
#-------------------------------------------------------
# 1.cv2读取图像并resize
# 2.图像转BGR2RGB和HWC2CHW
# 3.图像归一化
# 4.图像增加维度
# 5.onnx_session 推理
#-------------------------------------------------------
def inference(self,img):
if use_letterbox:
or_img=letterbox(img, input_shape)
else:
or_img=cv2.resize(img, input_shape)
img=or_img[:,:,::-1].transpose(2,0,1) #BGR2RGB和HWC2CHW
img=img.astype(dtype=np.float32)
img/=255.0
img=np.expand_dims(img,axis=0)
input_feed=self.get_input_feed(img)
pred=self.onnx_session.run(None,input_feed)[0]
return pred,or_img
#dets: array [x,6] 6个值分别为x1,y1,x2,y2,score,class
#thresh: 阈值
def nms(dets, thresh):
x1 = dets[:, 0]
y1 = dets[:, 1]
x2 = dets[:, 2]
y2 = dets[:, 3]
#-------------------------------------------------------
# 计算框的面积
# 置信度从大到小排序
#-------------------------------------------------------
areas = (y2 - y1 + 1) * (x2 - x1 + 1)
scores = dets[:, 4]
keep = []
index = scores.argsort()[::-1]
while index.size > 0:
i = index[0]
keep.append(i)
#-------------------------------------------------------
# 计算相交面积
# 1.相交
# 2.不相交
#-------------------------------------------------------
x11 = np.maximum(x1[i], x1[index[1:]])
y11 = np.maximum(y1[i], y1[index[1:]])
x22 = np.minimum(x2[i], x2[index[1:]])
y22 = np.minimum(y2[i], y2[index[1:]])
w = np.maximum(0, x22 - x11 + 1)
h = np.maximum(0, y22 - y11 + 1)
overlaps = w * h
#-------------------------------------------------------
# 计算该框与其它框的IOU,去除掉重复的框,即IOU值大的框
# IOU小于thresh的框保留下来
#-------------------------------------------------------
ious = overlaps / (areas[i] + areas[index[1:]] - overlaps)
idx = np.where(ious <= thresh)[0]
index = index[idx + 1]
return keep
def xywh2xyxy(x):
# [x, y, w, h] to [x1, y1, x2, y2]
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2
y[:, 1] = x[:, 1] - x[:, 3] / 2
y[:, 2] = x[:, 0] + x[:, 2] / 2
y[:, 3] = x[:, 1] + x[:, 3] / 2
return y
def filter_box(org_box,conf_thres,iou_thres): #过滤掉无用的框
#-------------------------------------------------------
# 删除为1的维度
# 删除置信度小于conf_thres的BOX
#-------------------------------------------------------
org_box=np.squeeze(org_box)
conf = org_box[..., 4] > conf_thres
box = org_box[conf == True]
#-------------------------------------------------------
# 通过argmax获取置信度最大的类别
#-------------------------------------------------------
cls_cinf = box[..., 5:]
cls = []
for i in range(len(cls_cinf)):
cls.append(int(np.argmax(cls_cinf[i])))
all_cls = list(set(cls))
#-------------------------------------------------------
# 分别对每个类别进行过滤
# 1.将第6列元素替换为类别下标
# 2.xywh2xyxy 坐标转换
# 3.经过非极大抑制后输出的BOX下标
# 4.利用下标取出非极大抑制后的BOX
#-------------------------------------------------------
output = []
for i in range(len(all_cls)):
curr_cls = all_cls[i]
curr_cls_box = []
curr_out_box = []
for j in range(len(cls)):
if cls[j] == curr_cls:
box[j][5] = curr_cls
curr_cls_box.append(box[j][:6])
curr_cls_box = np.array(curr_cls_box)
curr_cls_box = xywh2xyxy(curr_cls_box)
curr_out_box = nms(curr_cls_box,iou_thres)
for k in curr_out_box:
output.append(curr_cls_box[k])
output = np.array(output)
return output
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114)):
# Resize and pad image while meeting stride-multiple constraints
shape = im.shape[:2] # current shape [height, width]
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
# Compute padding
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = (new_shape[1] - new_unpad[0])/2, (new_shape[0] - new_unpad[1])/2 # wh padding
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
if shape[::-1] != new_unpad: # resize
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return im
def scale_boxes(input_shape, boxes, shape):
# Rescale boxes (xyxy) from input_shape to shape
gain = min(input_shape[0] / shape[0], input_shape[1] / shape[1]) # gain = old / new
pad = (input_shape[1] - shape[1] * gain) / 2, (input_shape[0] - shape[0] * gain) / 2 # wh padding
boxes[..., [0, 2]] -= pad[0] # x padding
boxes[..., [1, 3]] -= pad[1] # y padding
boxes[..., :4] /= gain
boxes[..., [0, 2]] = boxes[..., [0, 2]].clip(0, shape[1]) # x1, x2
boxes[..., [1, 3]] = boxes[..., [1, 3]].clip(0, shape[0]) # y1, y2
return boxes
def draw(image,box_data):
box_data = scale_boxes(input_shape, box_data, image.shape)
boxes=box_data[...,:4].astype(np.int32)
scores=box_data[...,4]
classes=box_data[...,5].astype(np.int32)
for box, score, cl in zip(boxes, scores, classes):
top, left, right, bottom = box
print('class: {}, score: {}, coordinate: [{}, {}, {}, {}]'.format(CLASSES[cl], score, top, left, right, bottom))
cv2.rectangle(image, (top, left), (right, bottom), (255, 0, 0), 1)
cv2.putText(image, '{0} {1:.2f}'.format(CLASSES[cl], score), (top, left), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 1)
if __name__=="__main__":
model=YOLOV5('./model/yolov5s.onnx')
img=cv2.imread('bus.jpg')
output,_=model.inference(img)
outbox=filter_box(output,0.5,0.5)
draw(img,outbox)
cv2.imwrite('res.jpg',img)
main.py
import cv2
import numpy as np
from pathlib import Path
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment
import yolov5
def linear_assignment(cost_matrix):
x, y = linear_sum_assignment(cost_matrix)
return np.array(list(zip(x, y)))
def iou_batch(bb_test, bb_gt):
"""
From SORT: Computes IOU between two bboxes in the form [x1,y1,x2,y2]
"""
bb_gt = np.expand_dims(bb_gt, 0)
bb_test = np.expand_dims(bb_test, 1)
xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
w = np.maximum(0., xx2 - xx1)
h = np.maximum(0., yy2 - yy1)
wh = w * h
o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
+ (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
return(o)
def convert_bbox_to_z(bbox):
"""
Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
[x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
the aspect ratio
"""
w = bbox[2] - bbox[0]
h = bbox[3] - bbox[1]
x = bbox[0] + w/2.
y = bbox[1] + h/2.
s = w * h #scale is just area
r = w / float(h)
return np.array([x, y, s, r]).reshape((4, 1))
def convert_x_to_bbox(x):
"""
Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
[x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
"""
w = np.sqrt(x[2] * x[3])
h = x[2] / w
return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
class KalmanBoxTracker(object):
"""
This class represents the internal state of individual tracked objects observed as bbox.
"""
count = 0
def __init__(self,bbox):
"""
Initialises a tracker using initial bounding box.
"""
#define constant velocity model
self.kf = KalmanFilter(dim_x=7, dim_z=4)
self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],[0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])
self.kf.R[2:,2:] *= 10.
self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities
self.kf.P *= 10.
self.kf.Q[-1,-1] *= 0.01
self.kf.Q[4:,4:] *= 0.01
self.kf.x[:4] = convert_bbox_to_z(bbox)
self.time_since_update = 0
self.id = KalmanBoxTracker.count
KalmanBoxTracker.count += 1
self.history = []
self.hits = 0
self.hit_streak = 0
self.age = 0
def update(self,bbox):
"""
Updates the state vector with observed bbox.
"""
self.time_since_update = 0
self.history = []
self.hits += 1
self.hit_streak += 1
self.kf.update(convert_bbox_to_z(bbox))
def predict(self):
"""
Advances the state vector and returns the predicted bounding box estimate.
"""
if(self.kf.x[6]+self.kf.x[2]<=0):
self.kf.x[6] *= 0.0
self.kf.predict()
self.age += 1
if(self.time_since_update>0):
self.hit_streak = 0
self.time_since_update += 1
self.history.append(convert_x_to_bbox(self.kf.x))
return self.history[-1]
def get_state(self):
"""
Returns the current bounding box estimate.
"""
return convert_x_to_bbox(self.kf.x)
def associate_detections_to_tracks(detections,trackers,iou_threshold = 0.3):
"""
Assigns detections to tracked object (both represented as bounding boxes)
Returns 3 lists of matches, unmatched_detections and unmatched_trackers
"""
if(len(trackers)==0):
return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)
#计算两两间的交并比,调用linear_assignment进行匹配
iou_matrix = iou_batch(detections, trackers)
if min(iou_matrix.shape) > 0:
a = (iou_matrix > iou_threshold).astype(np.int32)
if a.sum(1).max() == 1 and a.sum(0).max() == 1:
matched_indices = np.stack(np.where(a), axis=1)
else:
matched_indices = linear_assignment(-iou_matrix)
else:
matched_indices = np.empty(shape=(0,2))
#记录未匹配的检测框及轨迹
unmatched_detections = []
for d, det in enumerate(detections):
if(d not in matched_indices[:,0]):
unmatched_detections.append(d)
unmatched_trackers = []
for t, trk in enumerate(trackers):
if(t not in matched_indices[:,1]):
unmatched_trackers.append(t)
#过滤掉IoU低的匹配
matches = []
for m in matched_indices:
if(iou_matrix[m[0], m[1]]<iou_threshold):
unmatched_detections.append(m[0])
unmatched_trackers.append(m[1])
else:
matches.append(m.reshape(1,2))
if(len(matches)==0):
matches = np.empty((0,2),dtype=int)
else:
matches = np.concatenate(matches,axis=0)
return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
class Sort(object):
def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):
"""
Sets key parameters for SORT
"""
self.max_age = max_age
self.min_hits = min_hits
self.iou_threshold = iou_threshold
self.trackers = []
self.frame_count = 0
def update(self, dets=np.empty((0, 5))):
"""
Params:
dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections).
Returns the a similar array, where the last column is the object ID.
NOTE: The number of objects returned may differ from the number of detections provided.
"""
self.frame_count += 1
# get predicted locations from existing trackers.
trks = np.zeros((len(self.trackers), 5))
ret = []
for t, trk in enumerate(trks):
pos = self.trackers[t].predict()[0]
trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
#numpy.ma.masked_invalid屏蔽出现无效值的数组(NaN或inf;numpy.ma.compress_rows压缩包含掩码值的2-D 数组的整行。
trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
matched, unmatched_dets, unmatched_trks = associate_detections_to_tracks(dets, trks, self.iou_threshold)
# update matched trackers with assigned detections
for m in matched:
self.trackers[m[1]].update(dets[m[0], :])
# create and initialise new trackers for unmatched detections
for i in unmatched_dets:
trk = KalmanBoxTracker(dets[i,:])
self.trackers.append(trk)
i = len(self.trackers)
#自后向前遍历,仅返回在当前帧出现且命中周期大于self.min_hits(除非跟踪刚开始)的跟踪结果;如果未命中时间大于self.max_age则删除跟踪器。
for trk in reversed(self.trackers):
d = trk.get_state()[0]
if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits): #hit_streak:忽略目标初始的若干帧
ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) # +1 as MOT benchmark requires positive
i -= 1
if(trk.time_since_update > self.max_age):
self.trackers.pop(i)
if(len(ret)>0):
return np.concatenate(ret)
return np.empty((0,5))
if __name__ == '__main__':
mot_tracker = Sort(max_age=1, min_hits=3, iou_threshold=0.3) #create instance of the SORT tracker
colours = np.random.rand(32, 3) * 255
model = yolov5.YOLOV5('./model/yolov5s.onnx')
files = Path("./data").glob('*')
for file in files:
print(file)
image = cv2.imread(str(file))
output, _ = model.inference(image)
outbox = yolov5.filter_box(output, 0.5, 0.5)
outbox = yolov5.scale_boxes(yolov5.input_shape, outbox, image.shape)
outbox = outbox[:, :5]
outbox = outbox[np.lexsort(outbox[:,::-1].T)]
trackers = mot_tracker.update(outbox)
for d in trackers:
d = d.astype(np.int32)
cv2.rectangle(image, (d[0], d[1]), (d[2], d[3]), colours[d[4]%32,:], 1)
cv2.imshow('sort', image)
cv2.waitKey(50)
效果:
C++工程实现部分代码:
/********************************************************************************
** @auth: taify
** @date: 2022/12/10
** @Ver : V1.0.0
** @desc: 主函数
*********************************************************************************/
#include <iostream>
#include <iomanip>
#include <ctime>
#include <opencv2/opencv.hpp>
#include "utils.h"
#include "yolov5.h"
#include "kalmanboxtracker.h"
#include "sort.h"
const int frame_nums = 71;
int main(int argc, char** argv)
{
Sort mot_tracker = Sort(1, 3, 0.3);
cv::dnn::Net net = cv::dnn::readNet("./model/yolov5s.onnx");
std::vector<cv::Vec3b> colors(32);
srand(time(0));
for (size_t i = 0; i < colors.size(); i++)
colors[i] = cv::Vec3b(rand() % 255, rand() % 255, rand() % 255);
std::string prefix = "./data/";
for (size_t i = 1; i <= frame_nums; i++)
{
std::stringstream ss;
ss << std::setw(6) << std::setfill('0') << i;
std::string frame_name = prefix + ss.str() + ".jpg";
std::cout << "******************************************************************* " << frame_name << std::endl;
cv::Mat frame = cv::imread(frame_name);
cv::Mat image = frame.clone(), blob;;
pre_process(image, blob);
std::vector<cv::Mat> outputs;
process(blob, net, outputs);
std::vector<cv::Rect> detections = post_process(image, outputs);
//std::sort(detections.begin(), detections.end(), [](cv::Rect rect1, cv::Rect rect2) {return rect1.x < rect2.x; });
std::vector<std::vector<float>> trackers = mot_tracker.update(detections);
//for (auto detection : detections)
// cv::rectangle(image, detection, cv::Scalar(0, 0, 255), 1);
for (auto tracker : trackers)
cv::rectangle(image, cv::Rect(tracker[0], tracker[1], tracker[2]- tracker[0], tracker[3]- tracker[1]), colors[int(tracker[4])%32], 1);
cv::imshow("image", image);
cv::waitKey(1);
}
cv::destroyAllWindows();
return 0;
}
完整工程代码、模型以及数据集的下载链接见:sort+yolov5
另外,本文的实现参考了博文:SORT 多目标跟踪算法笔记