240628_昇思学习打卡-Day10-SSD目标检测
今天我们来看SSD(Single Shot MultiBox Detector)算法,SSD是发布于2016年的一种目标检测算法,使用的是one-stage目标检测网络,意思就是说它只需要一步,就能把目标检测出来。
文章目录
- 240628_昇思学习打卡-Day10-SSD目标检测
- 网络结构思想
- Backbone Layer
- Extra Feature Layer
- 预测模块Detection Layer
- 网络结构代码
- 数据准备
- 数据采样
- 数据集创建
- 损失函数
- Metrics
- 训练过程
- 评估
当前目标检测主流算法分成两个类型:
1、two-stage方法:RCNN系列
先通过算法产生候选框,然后对这些候选框进行分类和回归。
2、one-stage方法:YOLO和SSD
直接通过主干网络给出类别位置信息,不需要区域生成。
网络结构思想
以下是SSD网络结构:
可以初步梳理一下这个结构:
首先输入300×300的三通道RGB图像,然后将其载入到VGG-16骨干网络,这里使用了VGG-16骨干网络的开始到Conv5的第三个卷积层(这里需要注意一下,SSD中使用的VGG-16是使用到Conv5_3的部分,也就是图中第一个大的虚线框,大的虚线框中还有一个小的实线框,上面写着Conv4_3,有一根线连接到最后预测,意思就是说训练到这个阶段时就得到了预测特征层1)。
Backbone Layer
经过VGG-16后输出的图像维度为19X19X512,经过一个3X3X1024卷积核(对应VGG-16的第一个全连接层)(下图VGG网络中蓝色部分),维度变成19X19X1024,在经过一个1X1X1024的卷积核(对应VGG的第二个全连接层),得到维度19X19X1024,此时得到预测特征层2。
Extra Feature Layer
在经过一个1X1X256的卷积核和1个3X3X512,步长为2,padding=1的卷积核,输出维度为10X10X512,得到预测特征层3。
再经过一个1X1X128的卷积核和1个3X3X256,步长为2,padding=1的卷积核,输出维度为5X5X256,得到预测特征层4。
再经过一个1X1X128的卷积核和1个3X3X256,步长为1,padding=0的卷积核,输出维度为3X3X256,得到预测特征层5。
再经过一个1X1X128的卷积核和1个3X3X256,步长为1,padding=0的卷积核,输出维度为1X1X256,得到预测特征层6。
至此网络结束。得到6个不同特征图检测不同尺度的目标,底层预测小目标(底层经过的卷积较少,其感受野较小,保留细节较多),高层预测大目标(经过卷积较多,感受野较大,抽象程度加深,注重整体信息)。
左图为原图,中间的是把原图划分成1个8X8的特征矩阵,其中每一个小方格所包含信息较少,适合预测小目标,比如左侧猫的图像,就是通过8X8的特征矩阵划分预测出的,右边的是把原图划分成1个4X4的特征矩阵,此时每个小方格的信息包含量都是中间图的四倍,所以更适合预测较大目标,比如左图中狗的图像。
说的更详细一些,比如,在中间图(8X8特征矩阵)左下的这个多个方块围起来的这个点,他在周围3X3的区域内生成了多个小方格(实际为4个),这里称为DefaultBox,是我们的预选框(又称先验框)(又称anchor),预选框就是说,不管这里面有没有东西,我们都先把他框起来,每个单元格都会有相同数量相同大小的预选框。此处两个蓝色的预选框可以比较好的契合猫的位置,就可以去和样本中标注的GtBox(Ground Truth Box,正确框)进行比较,计算出其中的交并比。
在右图(4X4特征矩阵)中,红色框对于猫来说,显然太大了,可以框选住猫,但是会导致很多空间的浪费,而这种浪费,在这里就变成了误差,所以我们就要用大框去框大的物体,比如此处表情比较奇怪的狗(镜头感是有的)。反过来说,使用8X8的特征矩阵中的小框框,根本框不住这么大一条狗,就这么个小玩意儿,拿过来想框大黄,大黄都得嗤笑。
说到DefaultBox了,我们就要看看其scale以及aspect设定
这里可能说的有点不明白,21{1/2}就是基于21这个数的1:2尺寸,21{2}就是基于21这个数的2:1尺寸,这两是下图中后面那个白车身上的蓝色框框,21{1}就是基于21这个数的1:1尺寸,就是白车身上那个小的黄色框框,以此类推,红车也同理。
刚才在前面讲解网络结构时所说的输出的用于预测的六个特征层就是这里这六个特征图层。1到6就是顺着刚才的输出顺序排列的,1就是最早输出,信息最少的那一个,6就是最晚输出,信息最多的哪一个。针对特征图层①⑤⑥,我们会发现他的默认框尺寸的数量比③④⑤要少,可以理解为大目标和小目标数量都相对偏少,每个像素点只用4个DefaultBox(上图白车),中等尺寸的目标偏多,每个像素点用6个DefaultBox(上图红车),我们采用更多的框去框选中等尺寸的目标。多尺度检测的方式,可以使得检测更加充分(SSD属于密集检测),更能检测出小目标。
此时有一个可以注意的小点,在SSD网络结构的倒数第二层写着:Detections:8732 per Class,这个是什么意思呢,就是这里咯:
注意看表格最后的默认框数量:
38
∗
38
∗
4
+
19
∗
19
∗
6
+
10
∗
10
∗
6
+
5
∗
5
∗
6
+
3
∗
3
∗
4
+
1
∗
1
∗
4
=
8732
38*38*4+19*19*6+10*10*6+5*5*6+3*3*4+1*1*4=8732
38∗38∗4+19∗19∗6+10∗10∗6+5∗5∗6+3∗3∗4+1∗1∗4=8732
每个特征层的每个点都会以上述规则生成大量密集的DefaultBox
预测模块Detection Layer
关于predictor的实现,这里就不截取原论文了,直接使用大佬给的图进行讲解
Predictor采用(c+4)×k个卷积核实现,其中,k是每个FeatureMap里面每个像素点周围的k个DefaultBox(取值为4或6),而对于每个DefaultBox,我们都要对其分类,一共分c类,方法不是直接的二值化确定,而是对该框中的内容和类别进行比较,进行一个契合度打分,而这个4,说的是每个DefaultBox的边界框回归参数,对于每个DefaultBox,我们会预测其中的中心坐标x、y的偏移量以及宽度weight、高度height的偏移量。
比如我们目前使用特征图层①进行预测,此时k=4,我们要做一个二十分类任务,此时c=21,因为要加一个背景类(背景类位于第一个格子,索引为0),此时对于每个FeatureMap里面每个像素点,周边有4个DefaultBox,对每个DefaultBox,都有21个框框,这里面放着他有几分像从前(咳咳,串戏)。这里面放着他与21个类别的相似度,比如他有0.3分像猫,0.4分像狗,0.5分什么都没有是背景,等等等等。同时对每个DefaultBox,我们要计算其中的物体中心与Box中心坐标x、y的偏移量以及宽度weight、高度height的偏移量。
网络结构代码
from mindspore import nn
def _make_layer(channels):
"""
构建卷积层序列。
参数:
channels: 一个列表,包含了卷积层的输入和输出通道数。
返回:
一个包含卷积层和激活函数的序列模型。
"""
in_channels = channels[0]
layers = []
for out_channels in channels[1:]:
layers.append(nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3))
layers.append(nn.ReLU())
in_channels = out_channels
return nn.SequentialCell(layers)
class Vgg16(nn.Cell):
"""
VGG-16网络模型。
该类定义了VGG-16网络的结构,包括5个卷积块和一个池化层。
"""
def __init__(self):
"""
Vgg16类的初始化方法。
"""
super(Vgg16, self).__init__()
# 构建第一块卷积层
self.b1 = _make_layer([3, 64, 64])
# 构建第二块卷积层
self.b2 = _make_layer([64, 128, 128])
# 构建第三块卷积层
self.b3 = _make_layer([128, 256, 256, 256])
# 构建第四块卷积层
self.b4 = _make_layer([256, 512, 512, 512])
# 构建第五块卷积层
self.b5 = _make_layer([512, 512, 512, 512])
# 定义5个最大池化层
self.m1 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode='SAME')
self.m2 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode='SAME')
self.m3 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode='SAME')
self.m4 = nn.MaxPool2d(kernel_size=2, stride=2, pad_mode='SAME')
self.m5 = nn.MaxPool2d(kernel_size=3, stride=1, pad_mode='SAME')
def construct(self, x):
"""
VGG-16网络的前向传播方法。
参数:
x: 输入的张量。
返回:
block4: 第四块卷积层的输出,用于某些特征提取任务。
x: 经过全部卷积和池化层后的输出,用于最终的分类任务。
"""
# 经过第一块卷积层和池化层
# block1
x = self.b1(x)
x = self.m1(x)
# 经过第二块卷积层和池化层
# block2
x = self.b2(x)
x = self.m2(x)
# 经过第三块卷积层和池化层
# block3
x = self.b3(x)
x = self.m3(x)
# 经过第四块卷积层,保留输出用于中间特征提取
# block4
x = self.b4(x)
block4 = x
x = self.m4(x)
# 经过第五块卷积层和最后的池化层
# block5
x = self.b5(x)
x = self.m5(x)
return block4, x
# 导入MindSpore相关模块
import mindspore as ms
import mindspore.nn as nn
import mindspore.ops as ops
# 定义一个用于SSD模型中最后卷积层的辅助函数
def _last_conv2d(in_channel, out_channel, kernel_size=3, stride=1, pad_mod='same', pad=0):
"""
创建一个深度可分离卷积,用于SSD模型的最后卷积层。
参数:
in_channel: 输入通道数。
out_channel: 输出通道数。
kernel_size: 卷积核大小。
stride: 卷积步长。
pad_mod: 填充模式。
pad: 填充大小。
返回:
一个包含深度可分离卷积、批量归一化和ReLU激活函数的 SequentialCell。
"""
in_channels = in_channel
out_channels = in_channel
depthwise_conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, pad_mode='same',
padding=pad, group=in_channels)
conv = nn.Conv2d(in_channel, out_channel, kernel_size=1, stride=1, padding=0, pad_mode='same', has_bias=True)
bn = nn.BatchNorm2d(in_channel, eps=1e-3, momentum=0.97,
gamma_init=1, beta_init=0, moving_mean_init=0, moving_var_init=1)
return nn.SequentialCell([depthwise_conv, bn, nn.ReLU6(), conv])
# 定义一个用于SSD模型中特征图合并的类
class FlattenConcat(nn.Cell):
"""
将多个特征图展平并合并。
该类用于处理SSD模型中来自不同层的特征图,通过展平每个特征图的维度并沿通道维度合并它们。
"""
def __init__(self):
"""
初始化FlattenConcat类。
"""
super(FlattenConcat, self).__init__()
self.num_ssd_boxes = 8732 # SSD模型预测的默认框数量
def construct(self, inputs):
"""
构建特征图的展平和合并过程。
参数:
inputs: 一个元组,包含多个待处理的特征图。
返回:
合并后的特征图。
"""
output = ()
batch_size = ops.shape(inputs[0])[0] # 获取批次大小
for x in inputs:
x = ops.transpose(x, (0, 2, 3, 1)) # 调整特征图的维度顺序
output += (ops.reshape(x, (batch_size, -1)),) # 展平特征图
res = ops.concat(output, axis=1) # 合并特征图
return ops.reshape(res, (batch_size, self.num_ssd_boxes, -1)) # 重塑结果
# 定义一个用于生成多个默认框的MultiBox类
class MultiBox(nn.Cell):
"""
MultiBox类用于生成多个默认框以及对应的类别预测和位置预测。
每个MultiBox层包含类置信度预测和位置预测两个部分。
"""
def __init__(self):
"""
初始化MultiBox类。
"""
super(MultiBox, self).__init__()
num_classes = 81 # 类别数量
out_channels = [512, 1024, 512, 256, 256, 256] # 各层的输出通道数
num_default = [4, 6, 6, 6, 4, 4] # 各层的默认框数量
loc_layers = [] # 位置预测层列表
cls_layers = [] # 类别预测层列表
for k, out_channel in enumerate(out_channels):
loc_layers += [_last_conv2d(out_channel, 4 * num_default[k],
kernel_size=3, stride=1, pad_mod='same', pad=0)]
cls_layers += [_last_conv2d(out_channel, num_classes * num_default[k],
kernel_size=3, stride=1, pad_mod='same', pad=0)]
self.multi_loc_layers = nn.CellList(loc_layers) # 初始化位置预测层
self.multi_cls_layers = nn.CellList(cls_layers) # 初始化类别预测层
self.flatten_concat = FlattenConcat() # 初始化特征图合并类
def construct(self, inputs):
"""
构建MultiBox的预测过程。
参数:
inputs: 一个元组,包含来自不同层的特征图。
返回:
位置预测和类别预测的结果。
"""
loc_outputs = ()
cls_outputs = ()
for i in range(len(self.multi_loc_layers)):
loc_outputs += (self.multi_loc_layers[i](inputs[i]),) # 获取位置预测结果
cls_outputs += (self.multi_cls_layers[i](inputs[i]),) # 获取类别预测结果
return self.flatten_concat(loc_outputs), self.flatten_concat(cls_outputs)
# 定义SSD300Vgg16模型类
class SSD300Vgg16(nn.Cell):
"""
SSD300Vgg16模型类。
该类定义了基于VGG16骨干网的SSD300模型结构。
"""
def __init__(self):
"""
初始化SSD300Vgg16模型类。
"""
super(SSD300Vgg16, self).__init__()
# 初始化VGG16骨干网
self.backbone = Vgg16()
# 初始化SSD的扩展层
self.b6_1 = nn.Conv2d(in_channels=512, out_channels=1024, kernel_size=3, padding=6, dilation=6, pad_mode='pad')
self.b6_2 = nn.Dropout(p=0.5)
self.b7_1 = nn.Conv2d(in_channels=1024, out_channels=1024, kernel_size=1)
self.b7_2 = nn.Dropout(p=0.5)
# 初始化额外的特征提取层
self.b8_1 = nn.Conv2d(in_channels=1024, out_channels=256, kernel_size=1, padding=1, pad_mode='pad')
self.b8_2 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2, pad_mode='valid')
self.b9_1 = nn.Conv2d(in_channels=512, out_channels=128, kernel_size=1, padding=1, pad_mode='pad')
self.b9_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2, pad_mode='valid')
self.b10_1 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1)
self.b10_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, pad_mode='valid')
self.b11_1 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1)
self.b11_2 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, pad_mode='valid')
# 初始化MultiBox层,用于生成默认框和预测
self.multi_box = MultiBox()
def construct(self, x):
"""
构建SSD300Vgg16的前向传播过程。
参数:
x: 输入图像。
返回:
位置预测和类别预测的结果。
"""
# 使用VGG16骨干网提取特征
block4, x = self.backbone(x)
# 使用SSD的扩展层进一步提取特征
x = self.b6_1(x) # 1024
x = self.b6_2(x)
x = self.b7_1(x) # 1024
x = self.b7_2(x)
block7 = x
# 使用额外的特征提取层
x = self.b8_1(x) # 256
x = self.b8_2(x) # 512
block8 = x
x = self.b9_1(x) # 128
x = self.b9_2(x) # 256
block9 = x
x = self.b10_1(x) # 128
x = self.b10_2(x) # 256
block10 = x
x = self.b11_1(x) # 128
x = self.b11_2(x) # 256
block11 = x
# 使用MultiBox层生成默认框和预测
multi_feature = (block4, block7, block8, block9, block10, block11)
pred_loc, pred_label = self.multi_box(multi_feature)
if not self.training:
pred_label = ops.sigmoid(pred_label) # 非训练模式下,应用sigmoid激活函数
pred_loc = pred_loc.astype(ms.float32) # 确定输出类型为float32
pred_label = pred_label.astype(ms.float32)
return pred_loc, pred_label
数据准备
关于正负样本的选取问题。
正样本:
1、对于每个gtBox,去匹配IOU值最大的DefaultBox,归为正样本。
2、对于任意DefaultBox,只要与任意gtBox的IOU值大于0.5,就可以归为正样本
在实际应用任务中,一张图片的正样本往往只能选取到几个几十个,对于上面所说的8732个总样本来说,如果把剩下的样本全部作为负样本,会引入一个巨大的正负样本不平衡的问题。就理解一下嘛,你给我10道正确的题,8000道错题,我到底是来学把题做对来了还是把题做错来了。所以此时一般保持正负样本比例为1:3。
负样本:
计算所有DefaultBox的confidence loss(置信度损失),这个值越大就意味着网络把这个DefaultBox预测为目标的概率就越大,我们就把这个选出来作为负样本进行训练。这就好比你在学函数了,我给你1+1的题你肯定不会做错,做着也没什么意思,我就要给你比较难的题,容易做错的题,才能加强你的学习效果。
from download import download
dataset_url = "https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/ssd_datasets.zip"
path = "./"
path = download(dataset_url, path, kind="zip", replace=True)
coco_root = "./datasets/"
anno_json = "./datasets/annotations/instances_val2017.json"
train_cls = ['background', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
'train', 'truck', 'boat', 'traffic light', 'fire hydrant',
'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog',
'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra',
'giraffe', 'backpack', 'umbrella', 'handbag', 'tie',
'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
'kite', 'baseball bat', 'baseball glove', 'skateboard',
'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup',
'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed',
'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote',
'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink',
'refrigerator', 'book', 'clock', 'vase', 'scissors',
'teddy bear', 'hair drier', 'toothbrush']
train_cls_dict = {}
for i, cls in enumerate(train_cls):
train_cls_dict[cls] = i
数据采样
为了使模型对于各种输入对象大小和形状更加鲁棒,SSD算法每个训练图像通过以下选项之一随机采样:
- 使用整个原始输入图像
- 采样一个区域,使采样区域和原始图片最小的交并比重叠为0.1,0.3,0.5,0.7或0.9
- 随机采样一个区域
每个采样区域的大小为原始图像大小的[0.3,1],长宽比在1/2和2之间。如果真实标签框中心在采样区域内,则保留两者重叠部分作为新图片的真实标注框。在上述采样步骤之后,将每个采样区域大小调整为固定大小,并以0.5的概率水平翻转。
# 导入OpenCV和NumPy库,用于图像处理和数值计算
import cv2
import numpy as np
# 生成一个在指定范围内的随机数
def _rand(a=0., b=1.):
"""
生成一个在[a, b)范围内的随机浮点数。
参数:
a -- 随机数范围的下限 (默认为0)
b -- 随机数范围的上限 (默认为1)
返回:
一个在[a, b)范围内的随机浮点数。
"""
return np.random.rand() * (b - a) + a
# 计算两个框的交集
def intersect(box_a, box_b):
"""
计算两个框的交集区域的面积。
参数:
box_a -- 第一个框,格式为[N, 4]的NumPy数组,每个框由左上角(x, y)和右下角(x, y)定义
box_b -- 第二个框,格式与box_a相同
返回:
交集区域的面积,格式为[N]的NumPy数组
"""
"""Compute the intersect of two sets of boxes."""
max_yx = np.minimum(box_a[:, 2:4], box_b[2:4])
min_yx = np.maximum(box_a[:, :2], box_b[:2])
inter = np.clip((max_yx - min_yx), a_min=0, a_max=np.inf)
return inter[:, 0] * inter[:, 1]
# 计算两个框的Jaccard相似度
def jaccard_numpy(box_a, box_b):
"""
计算两个框的Jaccard相似度,即交集除以并集的面积比。
参数:
box_a -- 第一个框,格式为[N, 4]的NumPy数组
box_b -- 第二个框,格式与box_a相同
返回:
Jaccard相似度数组,格式为[N]的NumPy数组
"""
"""Compute the jaccard overlap of two sets of boxes."""
inter = intersect(box_a, box_b)
area_a = ((box_a[:, 2] - box_a[:, 0]) *
(box_a[:, 3] - box_a[:, 1]))
area_b = ((box_b[2] - box_b[0]) *
(box_b[3] - box_b[1]))
union = area_a + area_b - inter
return inter / union
# 随机裁剪图像和对应的框
def random_sample_crop(image, boxes):
"""
随机裁剪图像和对应的边界框,保持裁剪区域与原边界框的IOU大于等于指定的阈值。
参数:
image -- 要裁剪的图像,格式为[H, W, C]的NumPy数组
boxes -- 图像中对应的边界框,格式为[N, 4]的NumPy数组,每个框由左上角(x, y)和右下角(x, y)定义
返回:
裁剪后的图像和对应的边界框,格式与输入相同
"""
"""Crop images and boxes randomly."""
height, width, _ = image.shape
# 随机选择一个最小IOU阈值,或不选择(表示没有限制)
min_iou = np.random.choice([None, 0.1, 0.3, 0.5, 0.7, 0.9])
if min_iou is None:
return image, boxes
for _ in range(50):
image_t = image
# 随机确定裁剪区域的宽度和高度
w = _rand(0.3, 1.0) * width
h = _rand(0.3, 1.0) * height
# 确保裁剪区域的宽高比在0.5到2之间
# aspect ratio constraint b/t .5 & 2
if h / w < 0.5 or h / w > 2:
continue
# 随机确定裁剪区域的左上角位置
left = _rand() * (width - w)
top = _rand() * (height - h)
# 构造裁剪区域的矩形
rect = np.array([int(top), int(left), int(top + h), int(left + w)])
# 计算裁剪区域与所有边界框的IOU
overlap = jaccard_numpy(boxes, rect)
# 确定哪些边界框与裁剪区域有重叠
drop_mask = overlap > 0
# 如果没有重叠的边界框,跳过当前裁剪尝试
if not drop_mask.any():
continue
# 检查是否有边界框的IOU大于最小IOU阈值且小于最小IOU阈值+0.2,这样的框将被丢弃
if overlap[drop_mask].min() < min_iou and overlap[drop_mask].max() > (min_iou + 0.2):
continue
# 对图像进行裁剪
image_t = image_t[rect[0]:rect[2], rect[1]:rect[3], :]
# 计算边界框的中心位置
centers = (boxes[:, :2] + boxes[:, 2:4]) / 2.0
# 确定哪些边界框部分位于裁剪区域内
m1 = (rect[0] < centers[:, 0]) * (rect[1] < centers[:, 1])
m2 = (rect[2] > centers[:, 0]) * (rect[3] > centers[:, 1])
# 确定哪些边界框完全位于裁剪区域内
mask = m1 * m2 * drop_mask
# 如果没有边界框完全位于裁剪区域内,尝试下一次裁剪
if not mask.any():
continue
# 保留位于裁剪区域内的边界框,并调整它们的坐标到裁剪区域的参考系中
boxes_t = boxes[mask, :].copy()
boxes_t[:, :2] = np.maximum(boxes_t[:, :2], rect[:2])
boxes_t[:, :2] -= rect[:2]
boxes_t[:, 2:4] = np.minimum(boxes_t[:, 2:4], rect[2:4])
boxes_t[:, 2:4] -= rect[:2]
return image_t, boxes_t
# 如果50次尝试都未能找到合适的裁剪,返回原图像和边界框
return image, boxes
# 定义用于编码边界框的函数,用于处理边界框数据
def ssd_bboxes_encode(boxes):
"""
使用SSD方法对边界框进行编码。
参数:
boxes: 一个边界框列表,每个框是一个包含4个坐标和1个标签的列表。
返回:
经过编码后的边界框、对应的标签以及匹配到的边界框数量。
"""
# 内部函数,计算单个边界框与所有锚框的Jaccard相似度
def jaccard_with_anchors(bbox):
"""计算单个边界框与锚框集的Jaccard相似度"""
# 计算交集的边界和体积
ymin = np.maximum(y1, bbox[0])
xmin = np.maximum(x1, bbox[1])
ymax = np.minimum(y2, bbox[2])
xmax = np.minimum(x2, bbox[3])
w = np.maximum(xmax - xmin, 0.)
h = np.maximum(ymax - ymin, 0.)
# 计算体积
inter_vol = h * w
union_vol = vol_anchors + (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) - inter_vol
jaccard = inter_vol / union_vol
return np.squeeze(jaccard)
# 初始化分数、调整后的边界框和标签数组
pre_scores = np.zeros((8732), dtype=np.float32)
t_boxes = np.zeros((8732, 4), dtype=np.float32)
t_label = np.zeros((8732), dtype=np.int64)
# 遍历输入边界框进行编码处理
for bbox in boxes:
label = int(bbox[4]) # 获取类别标签
scores = jaccard_with_anchors(bbox) # 计算当前框与所有锚框的相似度
idx = np.argmax(scores) # 找到最匹配的锚框索引
scores[idx] = 2.0 # 标记最大相似度为2.0
mask = (scores > matching_threshold) # 生成匹配阈值以上的掩码
mask &= (scores > pre_scores) # 进一步筛选,确保新分数高于旧分数
pre_scores = np.maximum(pre_scores, scores * mask) # 更新最高分数
t_label = mask * label + (1 - mask) * t_label # 更新标签数组
# 更新调整后的边界框坐标
for i in range(4):
t_boxes[:, i] = mask * bbox[i] + (1 - mask) * t_boxes[:, i]
# 获取有效索引
index = np.nonzero(t_label)
# 调整边界框格式为tlbr(左上右下)
bboxes = np.zeros((8732, 4), dtype=np.float32)
bboxes[:, [0, 1]] = (t_boxes[:, [0, 1]] + t_boxes[:, [2, 3]]) / 2
bboxes[:, [2, 3]] = t_boxes[:, [2, 3]] - t_boxes[:, [0, 1]]
# 编码特征
bboxes_t = bboxes[index]
default_boxes_t = default_boxes[index]
bboxes_t[:, :2] = (bboxes_t[:, :2] - default_boxes_t[:, :2]) / (default_boxes_t[:, 2:] * 0.1)
tmp = np.maximum(bboxes_t[:, 2:4] / default_boxes_t[:, 2:4], 0.000001)
bboxes_t[:, 2:4] = np.log(tmp) / 0.2
bboxes[index] = bboxes_t
# 计算匹配到的边界框数量
num_match = np.array([len(np.nonzero(t_label)[0])], dtype=np.int32)
# 返回编码后的边界框、标签及匹配数
return bboxes, t_label.astype(np.int32), num_match
# 数据预处理函数,用于数据集
def preprocess_fn(img_id, image, box, is_training):
"""数据集的预处理函数。"""
cv2.setNumThreads(2)
# 内部函数,用于推理阶段的数据处理
def _infer_data(image, input_shape):
"""推理阶段图像尺寸调整和通道扩展"""
img_h, img_w, _ = image.shape
input_h, input_w = input_shape
image = cv2.resize(image, (input_w, input_h))
# 若图像为灰度图,转换为三通道图像
if len(image.shape) == 2:
image = np.expand_dims(image, axis=-1)
image = np.concatenate([image, image, image], axis=-1)
return img_id, image, np.array((img_h, img_w), np.float32)
# 内部函数,数据增强操作
def _data_aug(image, box, is_training, image_size=(300, 300)):
"""数据增强处理,包括随机裁剪、尺寸调整、翻转等"""
ih, iw, _ = image.shape
h, w = image_size
if not is_training:
return _infer_data(image, image_size)
# 随机裁剪图像和边界框
box = box.astype(np.float32)
image, box = random_sample_crop(image, box)
ih, iw, _ = image.shape
# 调整图像大小
image = cv2.resize(image, (w, h))
# 图像翻转
flip = _rand() < .5
if flip:
image = cv2.flip(image, 1, dst=None)
# 灰度图转为三通道
if len(image.shape) == 2:
image = np.expand_dims(image, axis=-1)
image = np.concatenate([image, image, image], axis=-1)
# 调整边界框坐标比例
box[:, [0, 2]] = box[:, [0, 2]] / ih
box[:, [1, 3]] = box[:, [1, 3]] / iw
if flip:
box[:, [1, 3]] = 1 - box[:, [3, 1]]
# 对边界框进行编码
box, label, num_match = ssd_bboxes_encode(box)
# 返回处理后的图像、边界框、标签及匹配数
return image, box, label, num_match
# 根据是否训练调用不同的数据处理流程
return _data_aug(image, box, is_training, image_size=[300, 300])
数据集创建
from mindspore import Tensor
from mindspore.dataset import MindDataset
from mindspore.dataset.vision import Decode, HWC2CHW, Normalize, RandomColorAdjust
def create_ssd_dataset(mindrecord_file, batch_size=32, device_num=1, rank=0,
is_training=True, num_parallel_workers=1, use_multiprocessing=True):
"""
创建用于SSD训练或检测的MindDataset数据集。
参数:
mindrecord_file (str): MindRecord文件路径。
batch_size (int): 批处理大小。
device_num (int): 设备数量,用于数据分片。
rank (int): 当前设备的排名。
is_training (bool): 是否为训练模式。如果是,数据集将进行随机打乱。
num_parallel_workers (int): 并行处理数据的工人数量。
use_multiprocessing (bool): 是否使用多进程进行数据处理。
返回:
MindDataset: 加工后的数据集。
"""
"""Create SSD dataset with MindDataset."""
# 初始化MindDataset,读取MindRecord文件中的数据
dataset = MindDataset(mindrecord_file, columns_list=["img_id", "image", "annotation"], num_shards=device_num,
shard_id=rank, num_parallel_workers=num_parallel_workers, shuffle=is_training)
# 解码图像
decode = Decode()
dataset = dataset.map(operations=decode, input_columns=["image"])
# 将图像格式从HWC转换为CHW,以满足模型输入要求
change_swap_op = HWC2CHW()
# 图像归一化,基于ImageNet的预训练模型
normalize_op = Normalize(mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
std=[0.229 * 255, 0.224 * 255, 0.225 * 255])
# 随机调整图像的亮度、对比度和饱和度,仅在训练时使用,以增加数据多样性
color_adjust_op = RandomColorAdjust(brightness=0.4, contrast=0.4, saturation=0.4)
# 定义预处理函数,根据是否为训练模式,执行不同的转换
compose_map_func = (lambda img_id, image, annotation: preprocess_fn(img_id, image, annotation, is_training))
if is_training:
# 训练模式下的输出列,包括处理后的图像、边界框、标签和匹配数量
output_columns = ["image", "box", "label", "num_match"]
# 训练时的数据转换操作
trans = [color_adjust_op, normalize_op, change_swap_op]
else:
# 非训练模式下的输出列,包括图像ID、处理后的图像和图像形状
output_columns = ["img_id", "image", "image_shape"]
# 非训练时的数据转换操作,仅包括归一化和格式转换
trans = [normalize_op, change_swap_op]
# 应用预处理函数和数据转换操作
dataset = dataset.map(operations=compose_map_func, input_columns=["img_id", "image", "annotation"],
output_columns=output_columns, python_multiprocessing=use_multiprocessing,
num_parallel_workers=num_parallel_workers)
# 应用图像格式转换和归一化操作
dataset = dataset.map(operations=trans, input_columns=["image"], python_multiprocessing=use_multiprocessing,
num_parallel_workers=num_parallel_workers)
# 将数据集分批,用于模型训练
dataset = dataset.batch(batch_size, drop_remainder=True)
return dataset
损失函数
段落里面写公式好看,但是导入CSDN好难受啊,只能插图片
详细解释看图
GT的中心坐标和宽高是不能直接参与训练的。要先把他们转化(编码)为相对于anchor的偏置量,使用偏移量参加训练,来计算loss。预测框的坐标形式是高度宽度中心点,而真实框坐标形式是左上角右下角,回归参数是预测框到真实框之间的差距,所以会有很多步骤是两种坐标形式的变换。
def class_loss(logits, label):
"""
计算类别损失。
此函数计算模型输出的logits与真实标签之间的焦点损失,适用于处理分类任务中样本不平衡的问题,
能够调整易分类样本和难分类样本在损失中的比重。
参数:
logits: Tensor, 模型输出的未归一化对数概率值。
label: Tensor, 真实标签,形状应与logits相同。
返回:
Tensor, 计算得到的焦点损失值。
"""
"""计算类别损失。"""
# 将标签转换为one-hot编码形式
label = ops.one_hot(label, ops.shape(logits)[-1], Tensor(1.0, ms.float32), Tensor(0.0, ms.float32))
# 初始化所有元素的权重为1,用于后续损失计算
weight = ops.ones_like(logits)
# 初始化正例权重为全1,用于调整正负样本在损失中的比重
pos_weight = ops.ones_like(logits)
# 使用logits和转换后的标签、权重及正例权重计算带logits的二元交叉熵损失
sigmiod_cross_entropy = ops.binary_cross_entropy_with_logits(logits, label, weight.astype(ms.float32), pos_weight.astype(ms.float32))
# 计算logits的sigmoid值
sigmoid = ops.sigmoid(logits)
# 将标签转换为float类型,以便进行后续的浮点运算
label = label.astype(ms.float32)
# 计算p_t,即正确分类的概率
p_t = label * sigmoid + (1 - label) * (1 - sigmoid)
# 计算调制因子,用于调整易分类与难分类样本的损失比重
modulating_factor = ops.pow(1 - p_t, 2.0)
# 计算α权重因子,用于调整正负样本之间的相对权重
alpha_weight_factor = label * 0.75 + (1 - label) * (1 - 0.75)
# 结合调制因子和α权重因子计算最终的焦点损失
focal_loss = modulating_factor * alpha_weight_factor * sigmiod_cross_entropy
# 返回焦点损失值
return focal_loss
Metrics
在SSD中,训练过程是不需要用到非极大值抑制(NMS),但当进行检测时,例如输入一张图片要求输出框的时候,需要用到NMS过滤掉那些重叠度较大的预测框。
非极大值抑制的流程如下:
- 根据置信度得分进行排序
- 选择置信度最高的比边界框添加到最终输出列表中,将其从边界框列表中删除
- 计算所有边界框的面积
- 计算置信度最高的边界框与其它候选框的IoU
- 删除IoU大于阈值的边界框
- 重复上述过程,直至边界框列表为空
# 导入必要的库
import json
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
# 定义apply_eval函数,用于执行模型评估
def apply_eval(eval_param_dict):
"""
根据提供的参数字典执行模型评估流程。
参数:
eval_param_dict: 包含评估所需各项参数的字典,包括模型、数据集以及标注信息等。
返回:
评估后的各项指标。
"""
# 获取并设置模型为评估模式
net = eval_param_dict["net"]
net.set_train(False)
# 获取数据集和标注文件路径
ds = eval_param_dict["dataset"]
anno_json = eval_param_dict["anno_json"]
# 初始化COCO评估工具
coco_metrics = COCOMetrics(
anno_json=anno_json,
classes=train_cls, # 类别信息应提前定义
num_classes=81,
max_boxes=100,
nms_threshold=0.6,
min_score=0.1
)
# 遍历数据集进行预测与评估
for data in ds.create_dict_iterator(output_numpy=True, num_epochs=1):
img_id = data['img_id']
img_np = data['image']
image_shape = data['image_shape']
# 模型前向传播得到预测结果
output = net(Tensor(img_np))
# 针对每张图片的预测结果进行处理
for batch_idx in range(img_np.shape[0]):
pred_data = {
"boxes": output[0].asnumpy()[batch_idx],
"box_scores": output[1].asnumpy()[batch_idx],
"img_id": int(np.squeeze(img_id[batch_idx])),
"image_shape": image_shape[batch_idx]
}
# 更新评估指标
coco_metrics.update(pred_data)
# 获取最终评估结果
eval_metrics = coco_metrics.get_metrics()
return eval_metrics
# 定义apply_nms函数,实现非极大值抑制算法
def apply_nms(all_boxes, all_scores, thres, max_boxes):
"""
应用非极大值抑制(NMS)方法对预测框进行筛选。
参数:
all_boxes: 所有预测框的坐标数组。
all_scores: 预测框的分数数组。
thres: 重叠度阈值,用于决定是否保留预测框。
max_boxes: 最多保留的预测框数量。
返回:
保留下来的预测框索引列表。
"""
# 计算各框的面积
y1, x1, y2, x2 = all_boxes[:, 0], all_boxes[:, 1], all_boxes[:, 2], all_boxes[:, 3]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
# 按分数降序排序
order = all_scores.argsort()[::-1]
keep = [] # 用于存储保留的框的索引
while order.size > 0:
# 取当前最高分框的索引
i = order[0]
keep.append(i)
# 达到最大保留数量则结束
if len(keep) >= max_boxes:
break
# 计算当前框与其他框的交集部分
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
# 计算重叠度
ovr = inter / (areas[i] + areas[order[1:]] - inter)
# 保留重叠度小于阈值的框
inds = np.where(ovr <= thres)[0]
# 更新待处理框的索引
order = order[inds + 1]
return keep
class COCOMetrics:
"""
类COCOMetrics用于计算预测边界框的mAP(平均精度均值)。
方法:
__init__: 初始化COCOMetrics实例,设置各类参数及加载COCO标注信息。
update: 根据预测结果更新内部存储的预测信息。
get_metrics: 计算并返回评估指标,包括mAP。
"""
def __init__(self, anno_json, classes, num_classes, min_score, nms_threshold, max_boxes):
# 初始化参数,加载类别信息和COCO标注文件
self.num_classes = num_classes
self.classes = classes
self.min_score = min_score
self.nms_threshold = nms_threshold
self.max_boxes = max_boxes
self.val_cls_dict = dict(enumerate(classes))
self.coco_gt = COCO(anno_json)
self.class_dict = {cat['name']: cat['id'] for cat in self.coco_gt.loadCats(self.coco_gt.getCatIds())}
self.predictions = [] # 存储预测结果
self.img_ids = [] # 存储图片ID
def update(self, batch):
# 根据输入的预测批次更新预测信息
pred_boxes, box_scores, img_id, h, w = batch['boxes'], batch['box_scores'], batch['img_id'], batch['image_shape'][0], batch['image_shape'][1]
# 对每个类别应用NMS,整合预测框、标签和分数
for c in range(1, self.num_classes):
# 筛选高于阈值的预测框
valid_boxes, valid_scores = self.filter_by_score(pred_boxes, box_scores[:, c], self.min_score, h, w)
# 应用NMS
nms_boxes = apply_nms(valid_boxes, valid_scores, self.nms_threshold, self.max_boxes)
# 封装预测结果
self.package_predictions(nms_boxes, valid_scores, img_id, c)
self.img_ids.append(img_id)
def get_metrics(self):
# 将预测结果保存至文件,并使用COCO API计算mAP
with open('predictions.json', 'w') as f:
json.dump(self.predictions, f)
coco_dt = self.coco_gt.loadRes('predictions.json')
evaluator = COCOeval(self.coco_gt, coco_dt, iouType='bbox')
evaluator.params.imgIds = self.img_ids
evaluator.evaluate()
evaluator.accumulate()
evaluator.summarize()
# 返回mAP
return evaluator.stats[0]
class SsdInferWithDecoder(nn.Cell):
"""
类SsdInferWithDecoder用于SSD模型推理,包含解码预测边界框的功能。
方法:
__init__: 初始化网络、默认边界框和检查点路径。
construct: 构建函数,执行网络推理并解码预测的边界框。
"""
def __init__(self, network, default_boxes, ckpt_path):
# 加载模型参数,初始化网络和解码参数
super(SsdInferWithDecoder, self).__init__()
param_dict = ms.load_checkpoint(ckpt_path)
ms.load_param_into_net(network, param_dict)
self.network = network
self.default_boxes = default_boxes
self.prior_scaling_xy = 0.1
self.prior_scaling_wh = 0.2
def construct(self, x):
# 执行网络推理,解码预测的边界框位置
pred_loc, pred_label = self.network(x)
decoded_boxes = self.decode_boxes(pred_loc, self.default_boxes)
return decoded_boxes, pred_label
训练过程
(1)先验框匹配
在训练过程中,首先要确定训练图片中的ground truth(真实目标)与哪个先验框来进行匹配,与之匹配的先验框所对应的边界框将负责预测它。
SSD的先验框与ground truth的匹配原则主要有两点:
- 对于图片中每个ground truth,找到与其IOU最大的先验框,该先验框与其匹配,这样可以保证每个ground truth一定与某个先验框匹配。通常称与ground truth匹配的先验框为正样本,反之,若一个先验框没有与任何ground truth进行匹配,那么该先验框只能与背景匹配,就是负样本。
- 对于剩余的未匹配先验框,若某个ground truth的IOU大于某个阈值(一般是0.5),那么该先验框也与这个ground truth进行匹配。尽管一个ground truth可以与多个先验框匹配,但是ground truth相对先验框还是太少了,所以负样本相对正样本会很多。为了保证正负样本尽量平衡,SSD采用了hard negative mining,就是对负样本进行抽样,抽样时按照置信度误差(预测背景的置信度越小,误差越大)进行降序排列,选取误差的较大的top-k作为训练的负样本,以保证正负样本比例接近1:3。
注意点:
- 通常称与gt匹配的prior为正样本,反之,若某一个prior没有与任何一个gt匹配,则为负样本。
- 某个gt可以和多个prior匹配,而每个prior只能和一个gt进行匹配。
- 如果多个gt和某一个prior的IOU均大于阈值,那么prior只与IOU最大的那个进行匹配。
如上图所示,训练过程中的 prior boxes 和 ground truth boxes 的匹配,基本思路是:让每一个 prior box 回归并且到 ground truth box,这个过程的调控我们需要损失层的帮助,他会计算真实值和预测值之间的误差,从而指导学习的走向。
(2)损失函数
损失函数使用的是上文提到的位置损失函数和置信度损失函数的加权和。
(3)数据增强
使用之前定义好的数据增强方式,对创建好的数据增强方式进行数据增强。
模型训练时,设置模型训练的epoch次数为60,然后通过create_ssd_dataset类创建了训练集和验证集。batch_size大小为5,图像尺寸统一调整为300×300。损失函数使用位置损失函数和置信度损失函数的加权和,优化器使用Momentum,并设置初始学习率为0.001。回调函数方面使用了LossMonitor和TimeMonitor来监控训练过程中每个epoch结束后,损失值Loss的变化情况以及每个epoch、每个step的运行时间。设置每训练10个epoch保存一次模型。
import math
import itertools as it
from mindspore import set_seed
class GeneratDefaultBoxes():
"""
该类用于生成SSD中的默认框(Default Boxes)。遵循(W, H, anchor_sizes)的顺序,
其中`self.default_boxes`形状为[anchor_sizes, H, W, 4],最后一维代表[y, x, h, w]。
而`self.default_boxes_tlbr`形状与`self.default_boxes`相同,但最后一维为[y1, x1, y2, x2],
即边界框的左上和右下坐标。
"""
def __init__(self):
# 计算特征图上每个cell的步长
fk = 300 / np.array([8, 16, 32, 64, 100, 300])
# 计算不同特征层上的尺度因子
scale_rate = (0.95 - 0.1) / (len([4, 6, 6, 6, 4, 4]) - 1)
scales = [0.1 + scale_rate * i for i in range(len([4, 6, 6, 6, 4, 4]))] + [1.0]
# 初始化默认框列表
self.default_boxes = []
# 遍历每个特征图尺寸
for idex, feature_size in enumerate([38, 19, 10, 5, 3, 1]):
sk1, sk2 = scales[idex], scales[idex + 1]
sk3 = math.sqrt(sk1 * sk2) # 计算中间尺度
# 特定处理第一个特征层或根据预设的宽高比生成不同大小的默认框
if idex == 0 and not [[2], [2, 3], [2, 3], [2, 3], [2], [2]][idex]:
sizes = [(0.1, 0.1), (sk1 * math.sqrt(2), sk1 / math.sqrt(2)), (sk1 / math.sqrt(2), sk1 * math.sqrt(2))]
else:
sizes = [(sk1, sk1)]
for ar in [[2], [2, 3], [2, 3], [2, 3], [2], [2]][idex]:
w, h = sk1 * math.sqrt(ar[0]), sk1 / math.sqrt(ar[0])
sizes.extend([(w, h), (h, w)])
sizes.append((sk3, sk3)) # 添加中间尺度的正方形框
# 确保每个特征层的默认框数量正确
assert len(sizes) == [4, 6, 6, 6, 4, 4][idex]
# 为每个cell位置生成默认框
for i, j in it.product(range(feature_size), repeat=2):
for w, h in sizes:
cx, cy = (j + 0.5) / fk[idex], (i + 0.5) / fk[idex] # 中心坐标归一化
self.default_boxes.append([cy, cx, h, w])
# 定义转换函数,将中心坐标形式转为左上右下坐标形式
def to_tlbr(cy, cx, h, w):
return cy - h / 2, cx - w / 2, cy + h / 2, cx + w / 2
# 转换默认框坐标形式并存储
self.default_boxes_tlbr = np.array(list(map(to_tlbr, *zip(*self.default_boxes))), dtype='float32')
self.default_boxes = np.array(self.default_boxes, dtype='float32')
# 实例化并获取默认框的两种表示形式
default_boxes_tlbr = GeneratDefaultBoxes().default_boxes_tlbr
default_boxes = GeneratDefaultBoxes().default_boxes
# 分割坐标并计算每个框的体积(用于后续的IoU计算)
y1, x1, y2, x2 = np.split(default_boxes_tlbr[:, :4], 4, axis=-1)
vol_anchors = (x2 - x1) * (y2 - y1)
# 设置匹配阈值,用于确定预测框与默认框之间的IoU匹配条件
matching_threshold = 0.5
from mindspore.common.initializer import initializer, TruncatedNormal
def init_net_param(network, initialize_mode='TruncatedNormal'):
"""初始化网络中的参数。
参数:
network: 待初始化参数的网络对象。
initialize_mode: 初始化方法,默认为'TruncatedNormal'。
功能:
根据指定的初始化方式对网络参数进行初始化,不包括名称中包含'beta'、'gamma'和'bias'的参数。
"""
# 获取网络中所有可训练参数
params = network.trainable_params()
# 遍历参数列表
for param in params:
# 排除特定命名的参数
if 'beta' not in param.name and 'gamma' not in param.name and 'bias' not in param.name:
# 根据初始化模式设置参数值
if initialize_mode == 'TruncatedNormal':
# 使用截断正态分布初始化
param.set_data(initializer(TruncatedNormal(0.02), param.data.shape, param.data.dtype))
else:
# 使用其他初始化方法
param.set_data(initialize_mode, param.data.shape, param.data.dtype)
def get_lr(global_step, lr_init, lr_end, lr_max, warmup_epochs, total_epochs, steps_per_epoch):
"""生成学习率数组。
参数:
global_step: 当前全局步数。
lr_init: 初始学习率。
lr_end: 最终学习率。
lr_max: 峰值学习率。
warmup_epochs: 学习率预热期的轮数。
total_epochs: 总训练轮数。
steps_per_epoch: 每轮的步数。
功能:
生成一个随训练步数变化的学习率数组,包含预热阶段线性增加和余下阶段余弦退火的学习率变化。
"""
# 初始化学习率列表
lr_each_step = []
# 计算总步数
total_steps = steps_per_epoch * total_epochs
# 计算预热步数
warmup_steps = steps_per_epoch * warmup_epochs
# 根据步数生成对应学习率
for step in range(total_steps):
# 预热阶段
if step < warmup_steps:
lr = lr_init + (lr_max - lr_init) * step / warmup_steps
# 余弦退火阶段
else:
lr = lr_end + (lr_max - lr_end) * (1. + math.cos(math.pi * (step - warmup_steps) / (total_steps - warmup_steps))) / 2.
# 确保学习率非负
lr = max(lr, 0.0)
# 添加到学习率列表
lr_each_step.append(lr)
# 根据当前全局步数获取接下来的学习率序列
current_step = global_step
lr_each_step = np.array(lr_each_step).astype(np.float32)
learning_rate = lr_each_step[current_step:]
return learning_rate
import mindspore.dataset as ds
ds.config.set_enable_shared_mem(False)
import time
from mindspore.amp import DynamicLossScaler
# 设置随机种子以确保实验可复现性
set_seed(1)
# 数据加载
# 准备MindRecord格式的数据集目录和文件路径
mindrecord_dir = "./datasets/MindRecord_COCO"
mindrecord_file = "./datasets/MindRecord_COCO/ssd.mindrecord0"
# 创建数据集加载器,设定批量大小、进程排名、是否使用多进程
dataset = create_ssd_dataset(mindrecord_file, batch_size=5, rank=0, use_multiprocessing=True)
# 获取数据集的大小,即总的迭代次数
dataset_size = dataset.get_dataset_size()
# 从数据集中获取一个批次的数据作为示例
image, get_loc, gt_label, num_matched_boxes = next(dataset.create_tuple_iterator())
# 网络结构定义与参数初始化
network = SSD300Vgg16() # SSD300模型基于VGG16架构
init_net_param(network, initialize_mode='TruncatedNormal') # 初始化网络参数
# 学习率策略定义
lr = Tensor(get_lr(global_step=0 * dataset_size, # 初始全局步数
lr_init=0.001, lr_end=0.001 * 0.05, lr_max=0.05, # 初始学习率、最小学习率、最大学习率
warmup_epochs=2, total_epochs=60, steps_per_epoch=dataset_size)) # 预热期、总训练轮次、每轮步数
# 优化器配置
opt = nn.Momentum(filter(lambda x: x.requires_grad, network.get_parameters()), lr, # 只优化需梯度更新的参数
momentum=0.9, dampening=0.00015, nesterov=True) # 动量优化器参数
# 定义前向传播过程
def forward_fn(x, gt_loc, gt_label, num_matched_boxes):
pred_loc, pred_label = network(x)
mask = ops.less(0, gt_label).astype(ms.float32) # 生成有效样本的掩码
num_matched_boxes_sum = ops.sum(num_matched_boxes.astype(ms.float32)) # 匹配到框的总数
# 计算定位损失(Smooth L1 Loss)
mask_loc_expanded = ops.tile(ops.expand_dims(mask, -1), (1, 1, 4)) # 扩展掩码以匹配定位输出维度
smooth_l1_loss = nn.SmoothL1Loss()(pred_loc, gt_loc) * mask_loc_expanded
loss_loc = ops.sum(ops.sum(smooth_l1_loss, -1), -1) # 求和得到总体定位损失
# 计算分类损失
loss_cls = class_loss(pred_label, gt_label) # 假设class_loss为分类损失函数
loss_cls = ops.sum(loss_cls, (1, 2)) # 沿着类别维度求和
# 综合损失,除以匹配到的框数以平均化损失
total_loss = (loss_cls + loss_loc) / num_matched_boxes_sum
return ops.sum(total_loss)
# 自动微分与梯度缩放配置
grad_fn = ms.value_and_grad(forward_fn, None, opt.parameters, has_aux=False)
loss_scaler = DynamicLossScaler(1024, 2, 1000) # 动态损失缩放策略
# 单步训练过程
def train_step(x, gt_loc, gt_label, num_matched_boxes):
scaled_loss, grads = grad_fn(x, gt_loc, gt_label, num_matched_boxes)
unscaled_loss = loss_scaler.unscale(scaled_loss)
grads = loss_scaler.unscale(grads)
opt(grads)
return unscaled_loss
# 训练循环
print("=================== 开始训练 =====================")
for epoch in range(60): # 总共训练60个epoch
network.set_train(True) # 设置网络为训练模式
start_time = time.time()
for step, (image, get_loc, gt_label, num_matched_boxes) in enumerate(dataset.create_tuple_iterator()):
loss = train_step(image, get_loc, gt_label, num_matched_boxes)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Epoch:[{epoch + 1:03d}/{60}], "
f"loss:{loss.asnumpy():.4f} , "
f"time:{elapsed_time:.2f}s ")
# 保存模型
ms.save_checkpoint(network, "ssd-60_9.ckpt")
print("=================== 训练完成 =====================")
评估
自定义eval_net()类对训练好的模型进行评估,调用了上述定义的SsdInferWithDecoder类返回预测的坐标及标签,然后分别计算了在不同的IoU阈值、area和maxDets设置下的Average Precision(AP)和Average Recall(AR)。使用COCOMetrics类计算mAP。模型在测试集上的评估指标如下。
精确率(AP)和召回率(AR)的解释
- TP:IoU>设定的阈值的检测框数量(同一Ground Truth只计算一次)。
- FP:IoU<=设定的阈值的检测框,或者是检测到同一个GT的多余检测框的数量。
- FN:没有检测到的GT的数量。
精确率(AP)和召回率(AR)的公式
- 精确率(Average Precision,AP):
精确率是将正样本预测正确的结果与正样本预测的结果和预测错误的结果的和的比值,主要反映出预测结果错误率。
- 召回率(Average Recall,AR):
召回率是正样本预测正确的结果与正样本预测正确的结果和正样本预测错误的和的比值,主要反映出来的是预测结果中的漏检率。
关于以下代码运行结果的输出指标
- 第一个值即为mAP(mean Average Precision), 即各类别AP的平均值。
- 第二个值是iou取0.5的mAP值,是voc的评判标准。
- 第三个值是评判较为严格的mAP值,可以反应算法框的位置精准程度;中间几个数为物体大小的mAP值。
对于AR看一下maxDets=10/100的mAR值,反应检出率,如果两者接近,说明对于这个数据集来说,不用检测出100个框,可以提高性能。
# 定义MindRecord文件路径
mindrecord_file = "./datasets/MindRecord_COCO/ssd_eval.mindrecord0"
def ssd_eval(dataset_path, ckpt_path, anno_json):
"""
对SSD模型进行评估。
参数:
dataset_path: 数据集路径,用于模型评估。
ckpt_path: 模型检查点路径,用于加载训练好的模型。
anno_json: 注释JSON文件路径,包含评估数据集的标注信息。
"""
# 设置评估数据集的批处理大小
batch_size = 1
# 创建SSD数据集,用于模型评估
ds = create_ssd_dataset(dataset_path, batch_size=batch_size,
is_training=False, use_multiprocessing=False)
# 初始化SSD300Vgg16网络模型
network = SSD300Vgg16()
# 加载模型检查点
print("Load Checkpoint!")
net = SsdInferWithDecoder(network, Tensor(default_boxes), ckpt_path)
# 设置模型为评估模式
net.set_train(False)
# 计算评估数据集的总图像数量
total = ds.get_dataset_size() * batch_size
# 打印评估开始信息和数据集总图像数量
print("\n========================================\n")
print("total images num: ", total)
# 执行模型评估
eval_param_dict = {"net": net, "dataset": ds, "anno_json": anno_json}
mAP = apply_eval(eval_param_dict)
# 打印评估结果
print("\n========================================\n")
print(f"mAP: {mAP}")
def eval_net():
"""
启动模型评估流程。
"""
print("Start Eval!")
# 调用ssd_eval函数进行模型评估
ssd_eval(mindrecord_file, "./ssd-60_9.ckpt", anno_json)
# 执行模型评估
eval_net()
打卡图片:
关于SSD的内容还有很多很多,此处只来得及记录了一部分,更多的请详见更多大佬的博客内容
参考博客:
2.1SSD算法理论_哔哩哔哩_bilibili
SSD算法详解-CSDN博客
SSD原理解读-从入门到精通_ssd算法原理-CSDN博客
SSD的损失函数设计_ssd损失函数的方法有哪些-CSDN博客
Smooth L1 Loss(Huber):pytorch中的计算原理及使用问题_smooth huber loss-CSDN博客
图片均来自以上大佬博客或视频及昇思api。