目标检测 pytorch复现R-CNN目标检测项目
- 1、R-CNN目标检测项目基本流程思路
- 2、项目实现
- 1 、数据集下载:
- 2、车辆数据集抽取
- 3、创建分类器数据集
- 3、微调二分类网络模型
- 4、分类器训练
- 5、边界框回归器训练
- 6、效果测试
目标检测 R-CNN论文详细讲解
1、R-CNN目标检测项目基本流程思路
2、项目实现
项目整体源码以上传至CSDN
本文档实现了R-CNN算法进行目标检测的完整过程,包括
数据集创建
卷积神经网络训练
分类器训练
边界框回归器训练
目标检测器实现
1 、数据集下载:
本次复现使用PASCAL VOC2007数据集
执行代码0_pascal_voc.py
# -*- coding: utf-8 -*-
"""
@date: 2020/2/29 下午2:51
@file: pascal_voc.py
@author: zj
@description: 加载PASCAL VOC 2007数据集
"""
import cv2
import numpy as np
from torchvision.datasets import VOCDetection
def draw_box_with_text(img, object_list):
"""
绘制边框及其分类概率
:param img:
:param object_list:
:return:
"""
for object_ in object_list:
print(object_)
xmin, ymin, xmax, ymax = [int(temp) for temp in list(object_["bndbox"].values())]
name=object_["name"]
cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (0, 0, 255), 3)
cv2.putText(img,name, (xmin-10, ymin-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
return img
if __name__ == '__main__':
"""
下载PASCAL VOC数据集
"""
dataset = VOCDetection('../../data', year='2007', image_set='trainval', download=False)
print(len(dataset))
img, target = dataset.__getitem__(500) # 利用迭代器的方式获取到第1000张图片,返回图像数据与标注信息
img = np.array(img)
print(target)
print("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
print(target["annotation"].keys())
print("****************************************")
print(target["annotation"]["object"])
print("---------------------------------------")
# 进行数据可视化操作
img_copy = img.copy()
draw_img=draw_box_with_text(img_copy,target["annotation"]["object"])
print(img.shape)
cv2.imshow('draw_img', draw_img)
cv2.imshow('img', img)
cv2.waitKey(0)
cv2.destroyAllWindows()
2、车辆数据集抽取
本文列举训练车辆检测流程,所以需要利用VOCdevkit-VOC2007-ImageSets-Main目录下的文件抽取出包含有car标签的对应数据:
执行代码2_pascal_voc_car.py
# -*- coding: utf-8 -*-
"""
@date: 2020/2/29 下午2:43
@file: pascal_voc_car.py
@author: zj
@description: 从PASCAL VOC 2007数据集中抽取类别Car。保留1/10的数目
"""
import os
import shutil
import random
import numpy as np
import xmltodict
from util import check_dir
suffix_xml = '.xml'
suffix_jpeg = '.jpg'
car_train_path = '../../data/VOCdevkit/VOC2007/ImageSets/Main/car_train.txt'
car_val_path = '../../data/VOCdevkit/VOC2007/ImageSets/Main/car_val.txt'
voc_annotation_dir = '../../data/VOCdevkit/VOC2007/Annotations/'
voc_jpeg_dir = '../../data/VOCdevkit/VOC2007/JPEGImages/'
car_root_dir = '../../data/voc_car/'
def parse_train_val(data_path):
"""
提取指定类别图像
"""
samples = []
with open(data_path, 'r') as file:
lines = file.readlines()
for line in lines:
res = line.strip().split(' ')
if len(res) == 3 and int(res[2]) == 1:
samples.append(res[0])
return np.array(samples)
def sample_train_val(samples):
"""
随机采样样本,减少数据集个数(留下1/10)
"""
for name in ['train', 'val']:
dataset = samples[name]
length = len(dataset)
random_samples = random.sample(range(length), int(length / 10))
# print(random_samples)
new_dataset = dataset[random_samples]
samples[name] = new_dataset
return samples
# def parse_car(sample_list):
# """
# 遍历所有的标注文件,筛选包含car的样本
# """
#
# car_samples = list()
# for sample_name in sample_list:
# annotation_path = os.path.join(voc_annotation_dir, sample_name + suffix_xml)
# with open(annotation_path, 'rb') as f:
# xml_dict = xmltodict.parse(f)
# # print(xml_dict)
#
# bndboxs = list()
# objects = xml_dict['annotation']['object']
# if isinstance(objects, list):
# for obj in objects:
# obj_name = obj['name']
# difficult = int(obj['difficult'])
# if 'car'.__eq__(obj_name) and difficult != 1:
# car_samples.append(sample_name)
# elif isinstance(objects, dict):
# obj_name = objects['name']
# difficult = int(objects['difficult'])
# if 'car'.__eq__(obj_name) and difficult != 1:
# car_samples.append(sample_name)
# else:
# pass
#
# return car_samples
def save_car(car_samples, data_root_dir, data_annotation_dir, data_jpeg_dir):
"""
保存类别Car的样本图片和标注文件
"""
for sample_name in car_samples:
src_annotation_path = os.path.join(voc_annotation_dir, sample_name + suffix_xml)
dst_annotation_path = os.path.join(data_annotation_dir, sample_name + suffix_xml)
shutil.copyfile(src_annotation_path, dst_annotation_path)
src_jpeg_path = os.path.join(voc_jpeg_dir, sample_name + suffix_jpeg)
dst_jpeg_path = os.path.join(data_jpeg_dir, sample_name + suffix_jpeg)
shutil.copyfile(src_jpeg_path, dst_jpeg_path)
csv_path = os.path.join(data_root_dir, 'car.csv')
np.savetxt(csv_path, np.array(car_samples), fmt='%s')
if __name__ == '__main__':
samples = {'train': parse_train_val(car_train_path), 'val': parse_train_val(car_val_path)}
print(samples)
# samples = sample_train_val(samples)
# print(samples)
check_dir(car_root_dir)
for name in ['train', 'val']:
data_root_dir = os.path.join(car_root_dir, name)
data_annotation_dir = os.path.join(data_root_dir, 'Annotations')
data_jpeg_dir = os.path.join(data_root_dir, 'JPEGImages')
check_dir(data_root_dir)
check_dir(data_annotation_dir)
check_dir(data_jpeg_dir)
save_car(samples[name], data_root_dir, data_annotation_dir, data_jpeg_dir)
print('done')
得到文件如下:
3、创建分类器数据集
通过选择性搜索算法的质量模式获取候选建议,然后计算候选建议与标注边界框的IoU,当建议框的IoU大于0,且小于0.3,并且大于最大标注框的1/5时,才为负样本
正样本为标注边界框
执行代码:3_create_classifier_data.py
# -*- coding: utf-8 -*-
"""
@date: 2020/3/1 下午7:17
@file: create_classifier_data.py
@author: zj
@description: 创建分类器数据集
"""
import random
import numpy as np
import shutil
import time
import cv2
import os
import xmltodict
import selectivesearch
from util import check_dir
from util import parse_car_csv
from util import parse_xml
from util import iou
from util import compute_ious
# train
# positive num: 625
# negative num: 366028
# val
# positive num: 625
# negative num: 321474
def parse_annotation_jpeg(annotation_path, jpeg_path, gs):
"""
获取正负样本(注:忽略属性difficult为True的标注边界框)
正样本:标注边界框
负样本:IoU大于0,小于等于0.3为负样本。为了进一步限制负样本数目,其大小必须大于标注框的1/5
"""
img = cv2.imread(jpeg_path)
selectivesearch.config(gs, img, strategy='q')
# 计算候选建议
rects = selectivesearch.get_rects(gs)
# 获取标注边界框
bndboxs = parse_xml(annotation_path)
print("bndboxs:",bndboxs) #((),())
# 标注框大小
# 获得当前标注文件中面积最大的标注框面积
maximum_bndbox_size = 0
for bndbox in bndboxs:
xmin, ymin, xmax, ymax = bndbox
bndbox_size = (ymax - ymin) * (xmax - xmin)
if bndbox_size > maximum_bndbox_size:
maximum_bndbox_size = bndbox_size
# 获取候选建议和标注边界框的IoU
iou_list = compute_ious(rects, bndboxs)
positive_list = list()
negative_list = list()
for i in range(len(iou_list)):
xmin, ymin, xmax, ymax = rects[i]
rect_size = (ymax - ymin) * (xmax - xmin)
iou_score = iou_list[i]
if 0 < iou_score <= 0.3 and rect_size > maximum_bndbox_size / 5.0:
# 负样本
negative_list.append(rects[i])
else:
pass
return bndboxs, negative_list
if __name__ == '__main__':
car_root_dir = '../../data/voc_car/'
classifier_root_dir = '../../data/classifier_car/'
check_dir(classifier_root_dir)
gs = selectivesearch.get_selective_search()
for name in ['train', 'val']:
src_root_dir = os.path.join(car_root_dir, name)
src_annotation_dir = os.path.join(src_root_dir, 'Annotations')
src_jpeg_dir = os.path.join(src_root_dir, 'JPEGImages')
dst_root_dir = os.path.join(classifier_root_dir, name)
dst_annotation_dir = os.path.join(dst_root_dir, 'Annotations')
dst_jpeg_dir = os.path.join(dst_root_dir, 'JPEGImages')
check_dir(dst_root_dir)
check_dir(dst_annotation_dir)
check_dir(dst_jpeg_dir)
total_num_positive = 0
total_num_negative = 0
samples = parse_car_csv(src_root_dir)
# 复制csv文件
src_csv_path = os.path.join(src_root_dir, 'car.csv')
dst_csv_path = os.path.join(dst_root_dir, 'car.csv')
shutil.copyfile(src_csv_path, dst_csv_path)
for sample_name in samples:
print("在处理--------:",sample_name)
since = time.time()
src_annotation_path = os.path.join(src_annotation_dir, sample_name + '.xml')
src_jpeg_path = os.path.join(src_jpeg_dir, sample_name + '.jpg')
# 获取正负样本
positive_list, negative_list = parse_annotation_jpeg(src_annotation_path, src_jpeg_path, gs) # positive_list:标注框列表,negative_list:经过ioc分析后的推荐框列表
total_num_positive += len(positive_list)
total_num_negative += len(negative_list)
# 拼接csv文件的本地保存路径
dst_annotation_positive_path = os.path.join(dst_annotation_dir, sample_name + '_1' + '.csv')
dst_annotation_negative_path = os.path.join(dst_annotation_dir, sample_name + '_0' + '.csv')
dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + '.jpg')
# 保存图片
shutil.copyfile(src_jpeg_path, dst_jpeg_path)
# 保存正负样本标注
np.savetxt(dst_annotation_positive_path, np.array(positive_list), fmt='%d', delimiter=' ')
np.savetxt(dst_annotation_negative_path, np.array(negative_list), fmt='%d', delimiter=' ')
time_elapsed = time.time() - since
print('parse {}.png in {:.0f}m {:.0f}s'.format(sample_name, time_elapsed // 60, time_elapsed % 60))
print('%s positive num: %d' % (name, total_num_positive))
print('%s negative num: %d' % (name, total_num_negative))
print('done')
3、微调二分类网络模型
PyTorch提供了AlexNet的预训练模型
二分类模型:即1:代表car;0:代表背景
训练参数:
批量处理:每次训练128个图像,其中32个正样本,96个负样本
输入模型图像:缩放到(227, 227),随机水平翻转,进行归一化操作
优化器:使用SGD:学习率为1e-3,动量大小为0.9
随步长衰减:每隔7轮衰减一次,衰减因子为0.1
迭代次数:25轮
模型在训练过程中呈现过拟合现象,可考虑调整学习率、添加权重衰减以及更换优化器的方式:
学习率从1e-3调整为1e-4
添加L2权重衰减,衰减因子为1e-4
使用Adam替换SGD
微调实现:finetune.py
自定义微调数据集类:py/utils/data/custom_finetune_dataset.py
自定义批量采样器类:py/utils/data/custom_batch_sampler.py
执行代码:finetune.py
# -*- coding: utf-8 -*-
"""
@date: 2020/3/1 上午9:54
@file: finetune.py
@author: zj
@description:
"""
import os,cv2
import copy
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.models as models
import numpy as np
from py.utils.data.custom_finetune_dataset import CustomFinetuneDataset
from py.utils.data.custom_batch_sampler import CustomBatchSampler
from py.utils.data.util import check_dir
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
print("------------------")
def load_data(data_root_dir):
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((227, 227)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
data_loaders = {}
data_sizes = {}
for name in ['train', 'val']:
data_dir = os.path.join(data_root_dir, name)
data_set = CustomFinetuneDataset(data_dir, transform=transform) # 加载数据集,改写相应的迭代器方法
data_sampler = CustomBatchSampler(data_set.get_positive_num(), data_set.get_negative_num(), 32, 96) # 此处表示是将32张正例图像与96张负例图像组成128张混合数据
data_loader = DataLoader(data_set, batch_size=128, sampler=data_sampler, num_workers=8, drop_last=True)
data_loaders[name] = data_loader # 迭代器
data_sizes[name] = data_sampler.__len__() # 得到的是正例框体与负例框体的汇总数量
return data_loaders, data_sizes
def train_model(data_loaders, model, criterion, optimizer, lr_scheduler, num_epochs=25, device=None):
since = time.time()
best_model_weights = copy.deepcopy(model.state_dict())
best_acc = 0.0
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
# Each epoch has a training and validation phase
for phase in ['train', 'val']:
if phase == 'train':
model.train() # Set model to training mode
else:
model.eval() # Set model to evaluate mode
running_loss = 0.0
running_corrects = 0
# Iterate over data.
for inputs, labels in data_loaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)
# zero the parameter gradients
optimizer.zero_grad() # 进行梯度清零操作
# forward
# track history if only in train
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
optimizer.step()
# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
if phase == 'train':
lr_scheduler.step()
epoch_loss = running_loss / data_sizes[phase]
epoch_acc = running_corrects.double() / data_sizes[phase]
print('{} Loss: {:.4f} Acc: {:.4f}'.format(
phase, epoch_loss, epoch_acc))
# deep copy the model
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_weights = copy.deepcopy(model.state_dict())
print()
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# load best model weights
model.load_state_dict(best_model_weights)
return model
if __name__ == '__main__':
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
data_loaders, data_sizes = load_data('./data/classifier_car') # 返回迭代器,样本量
# AlexNet预训练模型
model = models.alexnet(pretrained=True)
print(model)
data_loader = data_loaders["train"]
num_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(num_features, 2)
print(model)
model = model.to(device)
inputs,targets = next(data_loader.__iter__())
print(inputs[0].size(),type(inputs[0]))
trans=transforms.ToPILImage()
print(type(trans(inputs[0])))
print(targets)
print(inputs.shape)
titles = ["True" if i.item() else "False" for i in targets[0:60]]
images = [np.array(trans(im)) for im in inputs[0:60]]
from images_handle import show_image
show_image(images,titles=titles,num_cols=12)
# temp_=inputs.numpy()
# for temp__ in temp_:
# temp__=temp__.transpose(1,2,0)
# cv2.imshow(" ",temp__)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
best_model = train_model(data_loaders, model, criterion, optimizer, lr_scheduler, device=device, num_epochs=25)
# 保存最好的模型参数
check_dir('./models')
torch.save(best_model.state_dict(), 'models/alexnet_car.pth')
4、分类器训练
R-CNN在完成卷积模型的微调后,额外使用了线性SVM分类器,采用负样本挖掘方法进行训练,参考Hard Negative Mining,即针对正样本较少,负样本过多问题,在训练过程中,测试阶段中识别错误的负样本放置到训练集中用于训练
SVM损失函数设置为折页损失:
def hinge_loss(outputs, labels):
"""
折页损失计算
:param outputs: 大小为(N, num_classes)
:param labels: 大小为(N)
:return: 损失值
"""
num_labels = len(labels)
corrects = outputs[range(num_labels), labels].unsqueeze(0).T
# 最大间隔
margin = 1.0
margins = outputs - corrects + margin
loss = torch.sum(torch.max(margins, 1)[0]) / len(labels)
# # 正则化强度
# reg = 1e-3
# loss += reg * torch.sum(weight ** 2)
return loss
负样本挖掘
实现流程如下:
设置初始训练集,正负样本数比值为1:1(以正样本数目为基准)
每轮训练完成后,使用分类器对剩余负样本进行检测,如果检测为正,则加入到训练集中
重新训练分类器,重复第二步,直到检测精度开始收敛
训练参数
学习率:1e-4
动量:0.9
随步长衰减:每隔4轮衰减一次,参数因子α=0.1
迭代次数:10
批量处理:每次训练128个图像,其中32个正样本,96个负样本
执行代码:linear_svm.py
# -*- coding: utf-8 -*-
"""
@date: 2020/3/1 下午2:38
@file: linear_svm.py
@author: zj
@description:
"""
import time
import copy
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.models import alexnet
from py.utils.data.custom_classifier_dataset import CustomClassifierDataset
from py.utils.data.custom_hard_negative_mining_dataset import CustomHardNegativeMiningDataset
from py.utils.data.custom_batch_sampler import CustomBatchSampler
from py.utils.data.util import check_dir
from py.utils.data.util import save_model
batch_positive = 32
batch_negative = 96
batch_total = 128
def load_data(data_root_dir):
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((227, 227)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
data_loaders = {}
data_sizes = {}
remain_negative_list = list()
for name in ['train', 'val']:
data_dir = os.path.join(data_root_dir, name)
data_set = CustomClassifierDataset(data_dir, transform=transform)
if name is 'train':
"""
使用hard negative mining方式
初始正负样本比例为1:1。由于正样本数远小于负样本,所以以正样本数为基准,在负样本集中随机提取同样数目负样本作为初始负样本集
"""
positive_list = data_set.get_positives() # 返回正例样本数据 形如 [{"rect":[12,25,227,96],"image_id":1},{"rect":[12,25,227,96],"image_id":1},...]
negative_list = data_set.get_negatives() # 返回负总例样本数据 形如 [{"rect":[12,25,227,96],"image_id":0},{"rect":[12,25,227,96],"image_id":0},...]
init_negative_idxs = random.sample(range(len(negative_list)), len(positive_list)) # 从负总样本集中随机抽取一定的数量的样本,用于当做负样本,训练模型(数据索引)
init_negative_list = [negative_list[idx] for idx in range(len(negative_list)) if idx in init_negative_idxs] # 表示从负总样本中挑选出来的与正样本数量一样的负样本
remain_negative_list = [negative_list[idx] for idx in range(len(negative_list))
if idx not in init_negative_idxs] # 负总样本中剔除了挑选的负样本剩余的负样本
data_set.set_negative_list(init_negative_list) # 重新定义负样本
data_loaders['remain'] = remain_negative_list
sampler = CustomBatchSampler(data_set.get_positive_num(), data_set.get_negative_num(),
batch_positive, batch_negative) # 传入的第一个参数是正样本的总量,第二个参数是负样本的总量
# 创建迭代器
data_loader = DataLoader(data_set, batch_size=batch_total, sampler=sampler, num_workers=8, drop_last=True)
data_loaders[name] = data_loader
data_sizes[name] = len(sampler) # 得到训练集样本的总量
return data_loaders, data_sizes
def hinge_loss(outputs, labels):
"""
折页损失计算
1、针对每个样本上对不同分类的分数,选择不是该样本真实分类的分数与该样本真实分类的分数进行比较,如果该分数大于1,小于真实分类上的分数,则loss为0
2、反之,该样本的loss为该分数加1再减去该样本在真实分类上的分数
3、对所有的样本都按照此方法进行计算得到每个样本的loss,然后将它们加在一起凑成总loss值,并除以样本数以求平均
:param outputs: 大小为(N, num_classes)
:param labels: 大小为(N) 样本真实分类标签
:return: 损失值
Li=
"""
num_labels = len(labels)
corrects = outputs[range(num_labels), labels].unsqueeze(0).T
# 最大间隔
margin = 1.0
margins = outputs - corrects + margin
loss = torch.sum(torch.max(margins, 1)[0]) / len(labels)
# # 正则化强度
# reg = 1e-3
# loss += reg * torch.sum(weight ** 2)
return loss
def add_hard_negatives(hard_negative_list, negative_list, add_negative_list):
for item in hard_negative_list:
if len(add_negative_list) == 0:
# 第一次添加负样本
negative_list.append(item)
add_negative_list.append(list(item['rect']))
if list(item['rect']) not in add_negative_list:
negative_list.append(item)
add_negative_list.append(list(item['rect']))
def get_hard_negatives(preds, cache_dicts):
"""
困难样本挖掘函数
:param preds:
:param cache_dicts:
:return:
"""
fp_mask = preds == 1 # 找寻到苦难样本,,,,得到掩码
tn_mask = preds == 0
fp_rects = cache_dicts['rect'][fp_mask].numpy()
fp_image_ids = cache_dicts['image_id'][fp_mask].numpy()
tn_rects = cache_dicts['rect'][tn_mask].numpy()
tn_image_ids = cache_dicts['image_id'][tn_mask].numpy()
hard_negative_list = [{'rect': fp_rects[idx], 'image_id': fp_image_ids[idx]} for idx in range(len(fp_rects))]
easy_negatie_list = [{'rect': tn_rects[idx], 'image_id': tn_image_ids[idx]} for idx in range(len(tn_rects))]
return hard_negative_list, easy_negatie_list
def train_model(data_loaders, model, criterion, optimizer, lr_scheduler, num_epochs=25, device=None):
since = time.time()
best_model_weights = copy.deepcopy(model.state_dict())
best_acc = 0.0
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
# Each epoch has a training and validation phase
for phase in ['train', 'val']:
if phase == 'train':
model.train() # Set model to training mode
else:
model.eval() # Set model to evaluate mode
running_loss = 0.0
running_corrects = 0
# 输出正负样本数
data_set = data_loaders[phase].dataset
print('{} - positive_num: {} - negative_num: {} - data size: {}'.format(
phase, data_set.get_positive_num(), data_set.get_negative_num(), data_sizes[phase]))
# Iterate over data.
for inputs, labels, cache_dicts in data_loaders[phase]: # for循环,其会调用迭代器 cache_dicts:形如:[{"rect":[12,25,227,96],"image_id":1},{"rect":[12,25,227,96],"image_id":1},...]
inputs = inputs.to(device)
labels = labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward
# track history if only in train
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
# print(outputs.shape)
_, preds = torch.max(outputs, 1) # 返回最大项的索引值
loss = criterion(outputs, labels) # 计算损失函数
# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
optimizer.step()
# statistics
running_loss += loss.item() * inputs.size(0) # inputs.size(0):为图像的批次数
running_corrects += torch.sum(preds == labels.data)
if phase == 'train':
lr_scheduler.step()
epoch_loss = running_loss / data_sizes[phase] # data_sizes[phase] 训练集或者样本集样本总数
epoch_acc = running_corrects.double() / data_sizes[phase]
print('{} Loss: {:.4f} Acc: {:.4f}'.format(
phase, epoch_loss, epoch_acc))
# deep copy the model
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_weights = copy.deepcopy(model.state_dict())
"""
# 每一轮训练完成后,测试剩余负样本集,进行hard negative mining 难分辨负样本挖掘
"""
train_dataset = data_loaders['train'].dataset
remain_negative_list = data_loaders['remain'] # 获取到负总样本数据集中除去训练集中负样本的剩余负样本
jpeg_images = train_dataset.get_jpeg_images() # 返回加载的图像数据列表
transform = train_dataset.get_transform() # 返回数据图像处理方式列表
with torch.set_grad_enabled(False):
remain_dataset = CustomHardNegativeMiningDataset(remain_negative_list, jpeg_images, transform=transform) # 初始化艰难搜索对象,改写迭代器
remain_data_loader = DataLoader(remain_dataset, batch_size=batch_total, num_workers=8, drop_last=True)
# 获取训练数据集的负样本集
negative_list = train_dataset.get_negatives() # 训练样本集数据
# 记录后续增加的负样本
add_negative_list = data_loaders.get('add_negative', []) # 创建一个键为“add_negative”的列表,用于存放负样本数据
running_corrects = 0
# Iterate over data.
for inputs, labels, cache_dicts in remain_data_loader:
inputs = inputs.to(device)
labels = labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
outputs = model(inputs)
# print(outputs.shape)
_, preds = torch.max(outputs, 1)
running_corrects += torch.sum(preds == labels.data)
hard_negative_list, easy_neagtive_list = get_hard_negatives(preds.cpu().numpy(), cache_dicts) # 困难样本挖掘函数
add_hard_negatives(hard_negative_list, negative_list, add_negative_list)
remain_acc = running_corrects.double() / len(remain_negative_list)
print('remiam negative size: {}, acc: {:.4f}'.format(len(remain_negative_list), remain_acc))
# 训练完成后,重置负样本,进行hard negatives mining
train_dataset.set_negative_list(negative_list) # 将训练集负样本进行更新
tmp_sampler = CustomBatchSampler(train_dataset.get_positive_num(), train_dataset.get_negative_num(),
batch_positive, batch_negative)
data_loaders['train'] = DataLoader(train_dataset, batch_size=batch_total, sampler=tmp_sampler,
num_workers=8, drop_last=True)
data_loaders['add_negative'] = add_negative_list
# 重置数据集大小
data_sizes['train'] = len(tmp_sampler)
# 每训练一轮就保存
save_model(model, 'models/linear_svm_alexnet_car_%d.pth' % epoch)
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# load best model weights
model.load_state_dict(best_model_weights)
return model
if __name__ == '__main__':
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# device = 'cpu'
data_loaders, data_sizes = load_data('./data/classifier_car')
# 加载CNN模型
model_path = './models/alexnet_car.pth'
model = alexnet() # 因为此处不是进行预训练,而是加载模型,所以没有参数pretrained=True
num_classes = 2
num_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(num_features, num_classes) # 将模型结构调整成二分类
# model.load_state_dict(torch.load(model_path))
model.load_state_dict(torch.load(model_path, map_location='cpu'))
model.eval() # 表示进入估算模式下
# 固定特征提取
for param in model.parameters():
param.requires_grad = False # 冻结各层模型参数
# 创建SVM分类器(即再重新定义最后一层,重新训练该层,采用的损失函数是折页损失)
model.classifier[6] = nn.Linear(num_features, num_classes)
# print(model)
model = model.to(device)
criterion = hinge_loss # 损失函数
# 由于初始训练集数量很少,所以降低学习率
optimizer = optim.SGD(model.parameters(), lr=1e-4, momentum=0.9)
# 共训练10轮,每隔4论减少一次学习率
lr_schduler = optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.1)
best_model = train_model(data_loaders, model, criterion, optimizer, lr_schduler, num_epochs=10, device=device)
# 保存最好的模型参数
save_model(best_model, 'models/best_linear_svm_alexnet_car.pth')
5、边界框回归器训练
使用SVM分类器对候选建议进行分类后,使用对应类别的边界框回归器(bounding-box regression)预测其坐标偏移值,这一操作能够进一步提高检测精度
执行代码4_create_bbox_regression_data.py
# -*- coding: utf-8 -*-
"""
@date: 2020/4/3 下午7:19
@file: create_bbox_regression_data.py
@author: zj
@description: 创建边界框回归数据集
"""
import os
import shutil
import numpy as np
import util as util
# 正样本边界框数目:37222
if __name__ == '__main__':
"""
从voc_car/train目录中提取标注边界框坐标
从classifier_car/train目录中提取训练集正样本坐标(IoU>=0.5),进一步提取IoU>0.6的边界框
数据集保存在bbox_car目录下
"""
voc_car_train_dir = '../../data/voc_car/train'
# ground truth
gt_annotation_dir = os.path.join(voc_car_train_dir, 'Annotations') # 标注xml所在文件夹
jpeg_dir = os.path.join(voc_car_train_dir, 'JPEGImages') # 图像数据集所在文件夹
classifier_car_train_dir = '../../data/classifier_car/train'
# positive
positive_annotation_dir = os.path.join(classifier_car_train_dir, 'Annotations')
dst_root_dir = '../../data/bbox_regression/'
dst_jpeg_dir = os.path.join(dst_root_dir, 'JPEGImages')
dst_bndbox_dir = os.path.join(dst_root_dir, 'bndboxs')
dst_positive_dir = os.path.join(dst_root_dir, 'positive')
util.check_dir(dst_root_dir)
util.check_dir(dst_jpeg_dir)
util.check_dir(dst_bndbox_dir)
util.check_dir(dst_positive_dir)
samples = util.parse_car_csv(voc_car_train_dir) # 读取csv文件,获取到图像名称
res_samples = list()
total_positive_num = 0
for sample_name in samples:
# 提取正样本边界框坐标(IoU>=0.5)
positive_annotation_path = os.path.join(positive_annotation_dir, sample_name + '_1.csv')
positive_bndboxes = np.loadtxt(positive_annotation_path, dtype=np.int, delimiter=' ')
# 提取标注边界框
gt_annotation_path = os.path.join(gt_annotation_dir, sample_name + '.xml')
bndboxs = util.parse_xml(gt_annotation_path)
# 计算符合条件(IoU>0.6)的候选建议
positive_list = list()
if len(positive_bndboxes.shape) == 1 and len(positive_bndboxes) != 0:
scores = util.iou(positive_bndboxes, bndboxs)
if np.max(scores) > 0.6:
positive_list.append(positive_bndboxes)
elif len(positive_bndboxes.shape) == 2:
for positive_bndboxe in positive_bndboxes:
scores = util.iou(positive_bndboxe, bndboxs)
if np.max(scores) > 0.6:
positive_list.append(positive_bndboxe)
else:
pass
# 如果存在正样本边界框(IoU>0.6),那么保存相应的图片以及标注边界框
if len(positive_list) > 0:
# 保存图片
jpeg_path = os.path.join(jpeg_dir, sample_name + ".jpg")
dst_jpeg_path = os.path.join(dst_jpeg_dir, sample_name + ".jpg")
shutil.copyfile(jpeg_path, dst_jpeg_path)
# 保存标注边界框
dst_bndbox_path = os.path.join(dst_bndbox_dir, sample_name + ".csv")
np.savetxt(dst_bndbox_path, bndboxs, fmt='%s', delimiter=' ')
# 保存正样本边界框
dst_positive_path = os.path.join(dst_positive_dir, sample_name + ".csv")
np.savetxt(dst_positive_path, np.array(positive_list), fmt='%s', delimiter=' ')
total_positive_num += len(positive_list)
res_samples.append(sample_name)
print('save {} done'.format(sample_name))
else:
print('-------- {} 不符合条件'.format(sample_name))
dst_csv_path = os.path.join(dst_root_dir, 'car.csv')
np.savetxt(dst_csv_path, res_samples, fmt='%s', delimiter=' ')
print('total positive num: {}'.format(total_positive_num))
print('done')
训练逻辑回归模型代码:
bbox_regression.py
# -*- coding: utf-8 -*-
"""
@date: 2020/4/3 下午6:55
@file: bbox_regression.py
@author: zj
@description: 边界框回归训练
"""
import os
import copy
import time,cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.models import AlexNet
from py.utils.data.custom_bbox_regression_dataset import BBoxRegressionDataset
import py.utils.data.util as util
def load_data(data_root_dir):
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((227, 227)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
data_set = BBoxRegressionDataset(data_root_dir, transform=transform)
data_loader = DataLoader(data_set, batch_size=128, shuffle=True, num_workers=8)
return data_loader
def train_model(data_loader, feature_model, model, criterion, optimizer, lr_scheduler, num_epochs=25, device=None):
since = time.time()
model.train() # Set model to training mode
loss_list = list()
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
running_loss = 0.0
# Iterate over data.
for inputs, targets in data_loader:
inputs = inputs.to(device) # 标注的图像截取片段
targets = targets.float().to(device) # 物体标注框
# for input in inputs:
# cv2.imshow("pppp",input.numpy().transpose(2,1,0))
# cv2.waitKey(0)
# cv2.destroyAllWindows()
features = feature_model.features(inputs) # 得到经过卷积神经网络提取到的数据特征
features = torch.flatten(features, 1) # 将批次的数据进行拉伸操作,由于第0维度代表数据的批次,所以从第1万维度开始拉伸
# zero the parameter gradients
optimizer.zero_grad()
# forward
outputs = model(features) # 将提取到的数据特征喂入回归模型,训练边框回归器
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
# statistics
running_loss += loss.item() * inputs.size(0)
lr_scheduler.step()
epoch_loss = running_loss / data_loader.dataset.__len__()
loss_list.append(epoch_loss)
print('{} Loss: {:.4f}'.format(epoch, epoch_loss))
# 每训练一轮就保存
util.save_model(model, './models/bbox_regression_%d.pth' % epoch)
print()
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
return loss_list
def get_model(device=None):
# 加载CNN模型
model = AlexNet(num_classes=2) # 设置为二分类
model.load_state_dict(torch.load('./models/best_linear_svm_alexnet_car.pth',map_location='cpu')) # 加载本地的模型文件
model.eval()
# 取消梯度追踪
for param in model.parameters():
param.requires_grad = False
if device:
model = model.to(device)
return model
if __name__ == '__main__':
data_loader = load_data('./data/bbox_regression')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
feature_model = get_model(device) # 加载本地的模型文件
print("--------------------->\n", feature_model)
# AlexNet最后一个池化层计算得到256*6*6输出
in_features = 256 * 6 * 6
out_features = 4
model = nn.Linear(in_features, out_features)
print(model)
model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
loss_list = train_model(data_loader, feature_model, model, criterion, optimizer, lr_scheduler, device=device,
num_epochs=12)
util.plot_loss(loss_list)
6、效果测试
# -*- coding: utf-8 -*-
"""
@date: 2020/3/2 上午8:07
@file: car_detector.py
@author: zj
@description: 车辆类别检测器
"""
import time
import copy
import cv2
import numpy as np
import torch
import torch.nn as nn
from torchvision.models import alexnet
import torchvision.transforms as transforms
import selectivesearch
import utils.data.util as util
def get_device():
return torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
def get_transform():
# 数据转换
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((227, 227)),
# transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
return transform
def get_model(device=None):
# 加载CNN模型
model = alexnet()
num_classes = 2
num_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(num_features, num_classes)
model.load_state_dict(torch.load('./models/best_linear_svm_alexnet_car.pth',map_location='cpu'))
model.eval() # 设置成推理模式
print(model)
# 取消梯度追踪
for param in model.parameters():
param.requires_grad = False
if device:
model = model.to(device)
return model
def draw_box_with_text(img, rect_list, score_list):
"""
绘制边框及其分类概率
:param img:
:param rect_list:
:param score_list:
:return:
"""
for i in range(len(rect_list)):
xmin, ymin, xmax, ymax = rect_list[i]
score = score_list[i]
cv2.rectangle(img, (xmin, ymin), (xmax, ymax), color=(0, 0, 255), thickness=1)
cv2.putText(img, "{:.3f}".format(score), (xmin, ymin), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
def nms(rect_list, score_list):
"""
非最大抑制
:param rect_list: list,大小为[N, 4]
:param score_list: list,大小为[N]
"""
nms_rects = list()
nms_scores = list()
rect_array = np.array(rect_list)
score_array = np.array(score_list)
# 一次排序后即可
# 按分类概率从大到小排序
idxs = np.argsort(score_array)[::-1]
rect_array = rect_array[idxs] # 即重新排序后的目标框列表
score_array = score_array[idxs] # 即重新排序后的概率值列表
thresh = 0.3
while len(score_array) > 0:
# 添加分类概率最大的边界框
nms_rects.append(rect_array[0])
nms_scores.append(score_array[0])
rect_array = rect_array[1:]
score_array = score_array[1:]
length = len(score_array)
if length <= 0:
break
# 计算IoU
iou_scores = util.iou(np.array(nms_rects[len(nms_rects) - 1]), rect_array) # 即每次都使用nms_rects列表中的最后一个框体与rect_array循环进行NMS
# print(iou_scores)
# 去除重叠率大于等于thresh的边界框
print("-------->",np.where(iou_scores < thresh))
idxs = np.where(iou_scores < thresh)[0]
rect_array = rect_array[idxs]
score_array = score_array[idxs]
return nms_rects, nms_scores
if __name__ == '__main__':
device = get_device() # 设置GPU或者CPU
print(device)
transform = get_transform() # 设置数据预处理方式
model = get_model(device=device) # 加载模型
# 创建selectivesearch对象
gs = selectivesearch.get_selective_search()
test_img_path = '../imgs/000007.jpg'
test_xml_path = '../imgs/000007.xml'
# test_img_path = '../imgs/000012.jpg'
# test_xml_path = '../imgs/000012.xml'
img = cv2.imread(test_img_path)
dst = copy.deepcopy(img)
bndboxs = util.parse_xml(test_xml_path) # 读取xml文件中的坐标框
for bndbox in bndboxs:
xmin, ymin, xmax, ymax = bndbox
cv2.rectangle(dst, (xmin, ymin), (xmax, ymax), color=(0, 255, 0), thickness=1)
# 候选区域建议
selectivesearch.config(gs, img, strategy='f')
rects = selectivesearch.get_rects(gs) # 传入图像,利用selective research算法生成一定数量的候选框
print('候选区域建议数目: %d' % len(rects))
# softmax = torch.softmax()
svm_thresh = 0.60
# 保存正样本边界框以及
score_list = list()
positive_list = list()
# tmp_score_list = list()
# tmp_positive_list = list()
start = time.time()
for rect in rects:
xmin, ymin, xmax, ymax = rect
rect_img = img[ymin:ymax, xmin:xmax]
rect_transform = transform(rect_img).to(device) # 对数据进行预处理,传入到设备上
output = model(rect_transform.unsqueeze(0))[0]
if torch.argmax(output).item() == 1:
"""
预测为汽车
"""
probs = torch.softmax(output, dim=0).cpu().numpy()
# tmp_score_list.append(probs[1])
# tmp_positive_list.append(rect)
if probs[1] >= svm_thresh: # 如果softmax求得的概率大于svm_thresh阈值,则为汽车
score_list.append(probs[1]) # 将概率加入到score_list列表
positive_list.append(rect) # 将框体加入到positive_list列表
# cv2.rectangle(dst, (xmin, ymin), (xmax, ymax), color=(0, 0, 255), thickness=2)
print(rect, output, probs)
end = time.time()
print('detect time: %d s' % (end - start))
# tmp_img2 = copy.deepcopy(dst)
# draw_box_with_text(tmp_img2, tmp_positive_list, tmp_score_list)
# cv2.imshow('tmp', tmp_img2)
#
# tmp_img = copy.deepcopy(dst)
# draw_box_with_text(tmp_img, positive_list, score_list)
# cv2.imshow('tmp2', tmp_img)
nms_rects, nms_scores = nms(positive_list, score_list) # 进行非极大值抑制算法
print(nms_rects)
print(nms_scores)
draw_box_with_text(dst, nms_rects, nms_scores)
cv2.imshow('img', dst)
cv2.waitKey(0)