目录
一、原理
二、源码解读
1、总参数文件configs/cascade_rcnn/cascade_mask_rcnn_r50_fpn_1x_coco.py
2、模型配置字典../_base_/models/cascade_mask_rcnn_r50_fpn.py
3、基于检测器类搭建模型 CascadeRCNN
4、backbone(ResNet)
5、neck(FPN)
6、rpn_head(RPNHead)
7、roi_head(CascadeRoIHead)
一、原理
《Cascade R-CNN: High Quality Object Detection and Instance Segmentation》
这篇文章提出了一种级联结构来改善检测及分割的效果(约有2-4的ap提升),虽然是 2018 年提出来的,但2021年做表格解析调研的时候,很多网络都用到了cascade的思想,可见确实有效。在目标检测领域,有两阶段及单阶段两种体系。Cascade就是在第二阶段级联多个分类回归步骤,将1+1的stage 修改为1+3的stage,通过proposals重采样的机制,逐步提高IOU阈值。前面stage可以生成更适合后面stage的proposals,逐步提高检测质量。如果在1+1的stage中直接提高阈值一,训练时后续步骤的输入质量更高,但positive数量减少且加重了不匹配问题,最终导致过拟合。
引用一个他人的评论:Detection其实并不是一个很合适的分类问题,没有一个明确的离散的正负样本的定义,而是通过IoU来连续定义的。但是IoU这个指标很难通过gradient descent来优化,虽然之前也有一些IoU loss的工作,但是效果并不理想。Cascade RCNN便是一个在这个方向上很好的尝试。
cascade模式下
1)在同一级中,阈值越高,输出box质量越高,但数量会变少。如图d,直接提高阈值会使表现变差
2)输出iou一般大于输入iou,图中彩色线条在灰色对角线上面。说明每一级都会在内部提升精度。
3)在3级的级联中,输入iou和输出iou正相关,线条呈上升趋势,输入候选框proposals质量越高,最后的输出box越好。
4)对所有box,没有最佳的阈值,同一条线不会始终高于另外2条线。在input iou较小时,输入较小的阈值u=0.5,输出蓝线最高;在input iou较大时,输入较大的阈值u=0.7,输出红线最高
5)在多个级联中,前一级的输出是后一级的输入,输入质量逐渐变好,阈值逐渐提高,输出质量也逐渐变好
几种multi-stage结构的比较
a)为经典的2阶段Faster R-CNN
b)为本文提出的Cascade 方法,一共是1+3=4个stages: 1个RPN+3个检测器(阈值设定分别为0.5/0.6/0.7)。最后cls是3个stage求平均,bouding box regression直接从最后一个stage获得。
c)Iterative BBox多阶段共享了 H1和阈值0.5,问题:同一阈值对不同iou输入的box表现能力不同;从阈值那张图的输入输出可以看出,输出已经改变了分布,不适合再用同一个H了。论文里还有一张散点图说明。
d)Integral Loss 公用pool,用不同的H和阈值。问题:第一个stage会过拟合,统一的H也不适合多级分布
总的来说,实验性结论是用3个pool+3个不同head效果最好。
模型大小、速度、精度AP变化:
加入cascade机制后,模型变大、AP提高、速度影响不是很大。适合资源够且对速度没有太高要求的场景。
几个相关概念:
two-stage:rpn网络先找出候选框,后进行分类回归,得到更精确的框,慢但准,如RCNN,Fast RCNN, Faster RCNN, Mask RCNN,
One-stage:直接产生候选框,快,如ssd、yolo
IOU阈值:在two-stage算法中有2个iou阈值,训练阶段,用阈值一区分positive和negitive,选择部分候选框,控制正负样本比例;推理阶段,用阈值二计算map
mismatch:训练和测试阶段的不匹配问题,训练阶段采用的Proposals 经过了处理,控制了positive和negitive的比例(positive和negitive的区分基于和gt的IOU,阈值的设置是个超参数,一般是0.5)。测试阶段的proposals是前序步骤的所有。
推荐阅读:
maskrcnn原理
https://blog.csdn.net/remanented/article/details/79564045
RPN(Region Proposal Network)和 Anchor 理解_梦星魂24的博客-CSDN博客_anchor network
二、源码解读
1、总参数文件configs/cascade_rcnn/cascade_mask_rcnn_r50_fpn_1x_coco.py
_base_ = [
'../_base_/models/cascade_mask_rcnn_r50_fpn.py',
'../_base_/datasets/coco_instance.py',
'../_base_/schedules/schedule_1x.py', '../_base_/default_runtime.py'
]
4个文件分别指向 模型、数据集、优化器、训练参数
2、模型配置字典../_base_/models/cascade_mask_rcnn_r50_fpn.py
包含模型各模块的配置和训练测试的配置,配置文件的type都是指向了具体调用的类
backbone(ResNet)+neck(FPN)+rpn_head(RPNHead)+roi_head(CascadeRoIHead)
搭建一个模型需要三部分组成:模型配置字典+mmdet/models/detectors下的检测器类+mmcv的build_from_cfg,三者的关系参考如下链接。可以简单理解为:检测器类是一个class,配置字典是这个class初始化所需的参数,build_from_cfg实现了实例化。
mmdetection之model构建_武乐乐~的博客-CSDN博客_build_from_cfg
配置字典的详细解读参考
Mask RCNN之mmdetection配置文件解读 - 简书
# model settings
model = dict(
type='CascadeRCNN', # 检测器类
backbone=dict(
type='ResNet', # backbone类
depth=50, # resnet50 作为主干网络
num_stages=4,
out_indices=(0, 1, 2, 3), # resnet50的4个层都要作为FPN的输入,大小为/4,/8,/16,/32
frozen_stages=1, # stage 1及其之前层不进行参数更新,-1表示参数都更新
norm_cfg=dict(type='BN', requires_grad=True),
norm_eval=True,
style='pytorch',
init_cfg=dict(type='Pretrained', checkpoint='torchvision://resnet50')),
neck=dict(
type='FPN',
in_channels=[256, 512, 1024, 2048], # resnet50的输出通道数作为 fpn的输入通道数
out_channels=256, # 1*1卷积通道数减小,统一至256,小分辨率上采样并相加
num_outs=5), # 注意这里输入是4chanels,输出是5chanels,加了一个尺度输出,即在/32尺度利用2*2的最大池化输出/64尺度。一起作为RPN网络的输入。
rpn_head=dict(
type='RPNHead',
in_channels=256,
feat_channels=256,
anchor_generator=dict(
type='AnchorGenerator',
scales=[8], # 共1*3=3种base_anchors,面积是一样的,
ratios=[0.5, 1.0, 2.0], # 长宽比,1.0表示8*8的大小
strides=[4, 8, 16, 32, 64]), # 5种尺度的anchor strides,对应于原图的比例
bbox_coder=dict(
type='DeltaXYWHBBoxCoder', # x y w h
target_means=[.0, .0, .0, .0],
target_stds=[1.0, 1.0, 1.0, 1.0]),
loss_cls=dict(
type='CrossEntropyLoss', use_sigmoid=True, loss_weight=1.0), # 分类损失
loss_bbox=dict(type='SmoothL1Loss', beta=1.0 / 9.0, loss_weight=1.0)), # 回归损失
roi_head=dict(
type='CascadeRoIHead',
num_stages=3, # 3段级联
stage_loss_weights=[1, 0.5, 0.25],
bbox_roi_extractor=dict(
type='SingleRoIExtractor',
roi_layer=dict(type='RoIAlign', output_size=7, sampling_ratio=0),
out_channels=256,
featmap_strides=[4, 8, 16, 32]), # ROI Align层,对于一个ROI,根据其大小,进行映射,取相应的尺度上的特征图
bbox_head=[ # cascade的3层 head配置基本是一致的,区别在于target_stds
dict(
type='Shared2FCBBoxHead',
in_channels=256,
fc_out_channels=1024,
roi_feat_size=7,
num_classes=80,
bbox_coder=dict(
type='DeltaXYWHBBoxCoder',
target_means=[0., 0., 0., 0.],
target_stds=[0.1, 0.1, 0.2, 0.2]), # 预测的dx和dy需要*0.1,dw和dh需要*0.2
reg_class_agnostic=True,
loss_cls=dict(
type='CrossEntropyLoss',
use_sigmoid=False,
loss_weight=1.0),
loss_bbox=dict(type='SmoothL1Loss', beta=1.0,
loss_weight=1.0)),
dict(
type='Shared2FCBBoxHead',
in_channels=256,
fc_out_channels=1024,
roi_feat_size=7,
num_classes=80,
bbox_coder=dict(
type='DeltaXYWHBBoxCoder',
target_means=[0., 0., 0., 0.],
target_stds=[0.05, 0.05, 0.1, 0.1]),
reg_class_agnostic=True,
loss_cls=dict(
type='CrossEntropyLoss',
use_sigmoid=False,
loss_weight=1.0),
loss_bbox=dict(type='SmoothL1Loss', beta=1.0,
loss_weight=1.0)),
dict(
type='Shared2FCBBoxHead',
in_channels=256,
fc_out_channels=1024,
roi_feat_size=7,
num_classes=80,
bbox_coder=dict(
type='DeltaXYWHBBoxCoder',
target_means=[0., 0., 0., 0.],
target_stds=[0.033, 0.033, 0.067, 0.067]),
reg_class_agnostic=True,
loss_cls=dict(
type='CrossEntropyLoss',
use_sigmoid=False,
loss_weight=1.0),
loss_bbox=dict(type='SmoothL1Loss', beta=1.0, loss_weight=1.0))
],
mask_roi_extractor=dict( # mask也级联,但不同级是一样的,配置文件就没写成list
type='SingleRoIExtractor',
roi_layer=dict(type='RoIAlign', output_size=14, sampling_ratio=0), # RoIAlign 输出尺度为14*14
out_channels=256,
featmap_strides=[4, 8, 16, 32]),
mask_head=dict(
type='FCNMaskHead', # 输入14*14的ROI特征,基于FCN进行Mask预测
num_convs=4,
in_channels=256,
conv_out_channels=256,
num_classes=80,
loss_mask=dict(
type='CrossEntropyLoss', use_mask=True, loss_weight=1.0))),
# model training and testing settings
train_cfg=dict(
rpn=dict(
assigner=dict( # 分配器
type='MaxIoUAssigner',
pos_iou_thr=0.7, # 与任意gt的iou超过pos_iou_thr为正样本
neg_iou_thr=0.3, # 与所有gt的iou低于neg_iou_thr的为负样本。
min_pos_iou=0.3, # 遍历每个gt,如果与其最重合的box的iou大于min_pos_iou,此gt分配给此box
match_low_quality=True,
ignore_iof_thr=-1),
sampler=dict( # 正负样本采样器
type='RandomSampler',
num=256,
pos_fraction=0.5,
neg_pos_ub=-1,
add_gt_as_proposals=False),
allowed_border=0,
pos_weight=-1,
debug=False),
rpn_proposal=dict(
nms_pre=2000, # rpn后,nms前每层留下的box数量,按照score取top
max_per_img=2000, # nms后取top
nms=dict(type='nms', iou_threshold=0.7),
min_bbox_size=0), # 按照面积过滤太小的box
rcnn=[
dict(
assigner=dict(
type='MaxIoUAssigner',
pos_iou_thr=0.5,
neg_iou_thr=0.5,
min_pos_iou=0.5,
match_low_quality=False,
ignore_iof_thr=-1),
sampler=dict(
type='RandomSampler',
num=512,
pos_fraction=0.25,
neg_pos_ub=-1,
add_gt_as_proposals=True),
mask_size=28,
pos_weight=-1,
debug=False),
dict(
assigner=dict(
type='MaxIoUAssigner',
pos_iou_thr=0.6,
neg_iou_thr=0.6,
min_pos_iou=0.6,
match_low_quality=False,
ignore_iof_thr=-1),
sampler=dict(
type='RandomSampler',
num=512,
pos_fraction=0.25,
neg_pos_ub=-1,
add_gt_as_proposals=True),
mask_size=28,
pos_weight=-1,
debug=False),
dict(
assigner=dict(
type='MaxIoUAssigner',
pos_iou_thr=0.7,
neg_iou_thr=0.7,
min_pos_iou=0.7,
match_low_quality=False,
ignore_iof_thr=-1),
sampler=dict(
type='RandomSampler',
num=512,
pos_fraction=0.25,
neg_pos_ub=-1,
add_gt_as_proposals=True),
mask_size=28,
pos_weight=-1,
debug=False)
]),
test_cfg=dict(
rpn=dict(
nms_pre=1000,
max_per_img=1000,
nms=dict(type='nms', iou_threshold=0.7),
min_bbox_size=0), # 1000个roi框送到bbox分支中
rcnn=dict(
score_thr=0.05,
nms=dict(type='nms', iou_threshold=0.5),
max_per_img=100,
mask_thr_binary=0.5))) # 100个bbox分支预测框送入到mask分支
关于anchor,可参考以下文章进行了解:
https://blog.csdn.net/u013565669/article/details/86750809
https://blog.csdn.net/yangyehuisw/article/details/105373508
https://blog.csdn.net/qq_32425195/article/details/104738634
在配置文件中,共有3个anchor相关参数:scales、ratios、strides
scales=[8], # 共1*3=3种base_anchors,面积是一样的,
ratios=[0.5, 1.0, 2.0], # 长宽比,1.0表示8*8的大小
strides=[4, 8, 16, 32, 64]), # 5种尺度的anchor strides,对应于原图的比例
通过以上3个参数生成anchor时,计算公式如下:
w = anchor_stride
h = anchor_stride
h_ratios = torch.sqrt(anchor_ratios)
w_ratios = 1 / h_ratios
# h_ratios/w_ratios = anchor_ratios
ws = (w * w_ratios* anchor_scales)
hs = (h * h_ratios * anchor_scales)
关于anchor的生成代码,可查看mmdetection/mmdet/core/anchor/anchor_generator.py
def gen_single_level_base_anchors(self,
base_size,
scales,
ratios,
center=None):
"""Generate base anchors of a single level.
Args:
base_size (int | float): Basic size of an anchor.
scales (torch.Tensor): Scales of the anchor.
ratios (torch.Tensor): The ratio between between the height
and width of anchors in a single level.
center (tuple[float], optional): The center of the base anchor
related to a single feature grid. Defaults to None.
Returns:
torch.Tensor: Anchors in a single-level feature maps.
"""
w = base_size
h = base_size
if center is None:
x_center = self.center_offset * w
y_center = self.center_offset * h
else:
x_center, y_center = center
h_ratios = torch.sqrt(ratios)
w_ratios = 1 / h_ratios
if self.scale_major:
ws = (w * w_ratios[:, None] * scales[None, :]).view(-1)
hs = (h * h_ratios[:, None] * scales[None, :]).view(-1)
else:
ws = (w * scales[:, None] * w_ratios[None, :]).view(-1)
hs = (h * scales[:, None] * h_ratios[None, :]).view(-1)
# use float anchor and the anchor's center is aligned with the
# pixel center
base_anchors = [
x_center - 0.5 * ws, y_center - 0.5 * hs, x_center + 0.5 * ws,
y_center + 0.5 * hs
]
base_anchors = torch.stack(base_anchors, dim=-1)
return base_anchors
gen_single_level_base_anchors的实现和上面的计算公式本质是一致的。在py文件中对配置文件的参数说明如下
strides (list[int] | list[tuple[int, int]]): Strides of anchors in multiple feature levels in order (w, h).
多尺度特征图anchor的strides
ratios (list[float]): The list of ratios between the height and width of anchors in a single level.
单个尺度下h和w的比例
scales (list[int] | None): Anchor scales for anchors in a single level. It cannot be set at the same time if `octave_base_scale` and `scales_per_octave` are set.
单个尺度下anchor的尺寸
base_sizes (list[int] | None): The basic sizes of anchors in multiple levels. If None is given, strides will be used as base_sizes. (If strides are non square, the shortest stride is taken.)
多尺度下anchor的基础尺寸,当为None时,取stride的值作为base_sizes。
注意:这里取了配置文件的stride作为代码实现中的base_size
3、基于检测器类搭建模型 CascadeRCNN
这里介绍下检测器类,从模型配置中可以看到type='CascadeRCNN’,指明了检测器类为CascadeRCNN,继承关系如下:
-
class CascadeRCNN(TwoStageDetector):
路径mmdet/models/detectors/cascade_rcnn.py。只重写了__init__和show_result
-
class TwoStageDetector(BaseDetector):
路径mmdet/models/detectors/two_stage.py。 在__init__中build_backbone(backbone),build_neck(neck),build_head(rpn_head_),build_head(roi_head)。通过这4个方法搭建模型,这4个方法的背后是mmcv。TwoStageDetector的其他方法暂时不进行深入
-
class BaseDetector(BaseModule, metaclass=ABCMeta):
路径mmdet/models/detectors/base.py。暂时不进行深入
- BaseModule
背后也是mmcv,surprise!from mmcv.runner import BaseModule
4、backbone(ResNet)
路径 mmdet/models/backbones/resnet.py
Resnet 常见写法,1个卷积+3个残差块的前向推理过程,按照配置文件,输出每一层的特征图
def forward(self, x):
"""Forward function."""
if self.deep_stem:
x = self.stem(x)
else:
x = self.conv1(x)
x = self.norm1(x)
x = self.relu(x)
x = self.maxpool(x)
outs = []
for i, layer_name in enumerate(self.res_layers):
res_layer = getattr(self, layer_name)
x = res_layer(x)
if i in self.out_indices: # out_indices=(0, 1, 2, 3)
outs.append(x)
return tuple(outs) # return各层输出的列表
5、neck(FPN)
路径mmdet/models/necks/fpn.py
特征金字塔实现
代码解读参考:
MMdetection之necks之FPN_落梅横笛已三更的博客-CSDN博客
@auto_fp16()
def forward(self, inputs):
"""Forward function."""
assert len(inputs) == len(self.in_channels) # in_channels=[256, 512, 1024, 2048],
# build laterals
# 将各种输入通道数,统一通过卷积处理为256,FPN中横向1*1卷积
laterals = [
lateral_conv(inputs[i + self.start_level]) # start_level Default: 0.
for i, lateral_conv in enumerate(self.lateral_convs)
]
# build top-down path
# 自顶向下上采样,相加
used_backbone_levels = len(laterals)
for i in range(used_backbone_levels - 1, 0, -1):
# In some cases, fixing `scale factor` (e.g. 2) is preferred, but
# it cannot co-exist with `size` in `F.interpolate`.
if 'scale_factor' in self.upsample_cfg:
# fix runtime error of "+=" inplace operation in PyTorch 1.10
laterals[i - 1] = laterals[i - 1] + F.interpolate(
laterals[i], **self.upsample_cfg)
else:
prev_shape = laterals[i - 1].shape[2:]
laterals[i - 1] = laterals[i - 1] + F.interpolate(
laterals[i], size=prev_shape, **self.upsample_cfg)
# build outputs
# part 1: from original levels
# 再做一次3*3的卷积, 消除上采样带来的影响
outs = [
self.fpn_convs[i](laterals[i]) for i in range(used_backbone_levels)
]
# part 2: add extra levels
# 加了一个尺度输出,即在最小尺度/32尺度利用2*2的最大池化输出/64尺度
if self.num_outs > len(outs):
# use max pool to get more levels on top of outputs
# (e.g., Faster R-CNN, Mask R-CNN)
if not self.add_extra_convs:
for i in range(self.num_outs - used_backbone_levels):
outs.append(F.max_pool2d(outs[-1], 1, stride=2))
# add conv layers on top of original feature maps (RetinaNet)
else:
if self.add_extra_convs == 'on_input':
extra_source = inputs[self.backbone_end_level - 1]
elif self.add_extra_convs == 'on_lateral':
extra_source = laterals[-1]
elif self.add_extra_convs == 'on_output':
extra_source = outs[-1]
else:
raise NotImplementedError
outs.append(self.fpn_convs[used_backbone_levels](extra_source))
for i in range(used_backbone_levels + 1, self.num_outs):
if self.relu_before_extra_convs:
outs.append(self.fpn_convs[i](F.relu(outs[-1])))
else:
outs.append(self.fpn_convs[i](outs[-1]))
return tuple(outs)
6、rpn_head(RPNHead)
路径 mmdet/models/dense_heads/rpn_head.py。继承关系为
- class RPNHead(AnchorHead),
- class AnchorHead(BaseDenseHead, BBoxTestMixin):
源码解读可参考 mmdetection源码阅读笔记:AnchorHead - 知乎
小白MMdetection - FCOSHead源码精读学习笔记 - 知乎
AnchorHead为经典单阶段目标检测输出头,其中有get_anchors、 loss等核心计算
RPNHead为两阶段目标检测头,在RPNHead中num_classes等于1,只做‘is object or not ’的预测
第一部分:AnchorHead
# Copyright (c) OpenMMLab. All rights reserved.
import warnings
import torch
import torch.nn as nn
from mmcv.runner import force_fp32
from mmdet.core import (anchor_inside_flags, build_assigner, build_bbox_coder,
build_prior_generator, build_sampler, images_to_levels,
multi_apply, unmap)
"""
这里可以关注下从mmdet.core import 的3个方法
anchor_inside_flags 判断每个anchor是否在距边界一定范围内
images_to_levels 根据每层的目标数量,将目标分层
multi_apply 对每个层都进行某个操作
"""
from ..builder import HEADS, build_loss
from .base_dense_head import BaseDenseHead
from .dense_test_mixins import BBoxTestMixin
@HEADS.register_module()
class AnchorHead(BaseDenseHead, BBoxTestMixin):
"""Anchor-based head (RPN, RetinaNet, SSD, etc.).
Args:
num_classes (int): Number of categories excluding the background
category.
in_channels (int): Number of channels in the input feature map.
feat_channels (int): Number of hidden channels. Used in child classes.
anchor_generator (dict): Config dict for anchor generator
bbox_coder (dict): Config of bounding box coder.
reg_decoded_bbox (bool): If true, the regression loss would be
applied directly on decoded bounding boxes, converting both
the predicted boxes and regression targets to absolute
coordinates format. Default False. It should be `True` when
using `IoULoss`, `GIoULoss`, or `DIoULoss` in the bbox head.
loss_cls (dict): Config of classification loss.
loss_bbox (dict): Config of localization loss.
train_cfg (dict): Training config of anchor head.
test_cfg (dict): Testing config of anchor head.
init_cfg (dict or list[dict], optional): Initialization config dict.
""" # noqa: W605
def __init__(self,
num_classes, # 除背景外的类别数量
in_channels, # 输入通道数256
feat_channels=256, # 隐藏层通道数量
anchor_generator=dict( # anchor配置
type='AnchorGenerator',
scales=[8, 16, 32],
ratios=[0.5, 1.0, 2.0],
strides=[4, 8, 16, 32, 64]),
bbox_coder=dict( # bbox
type='DeltaXYWHBBoxCoder',
clip_border=True,
target_means=(.0, .0, .0, .0),
target_stds=(1.0, 1.0, 1.0, 1.0)),
reg_decoded_bbox=False, # 为True 时计算loss,如IOUloss、DIoUloss;为False时计算L1loss,如smooth L1loss
loss_cls=dict( # 分类损失配置
type='CrossEntropyLoss',
use_sigmoid=True,
loss_weight=1.0),
loss_bbox=dict( # 回归损失配置
type='SmoothL1Loss', beta=1.0 / 9.0, loss_weight=1.0),
train_cfg=None,
test_cfg=None,
init_cfg=dict(type='Normal', layer='Conv2d', std=0.01)):
super(AnchorHead, self).__init__(init_cfg)
self.in_channels = in_channels
self.num_classes = num_classes
self.feat_channels = feat_channels
self.use_sigmoid_cls = loss_cls.get('use_sigmoid', False) # cascade mask rcnn中为True
if self.use_sigmoid_cls:
self.cls_out_channels = num_classes # sigmoid算分类损失,应该是1???
else:
self.cls_out_channels = num_classes + 1
if self.cls_out_channels <= 0:
raise ValueError(f'num_classes={num_classes} is too small')
self.reg_decoded_bbox = reg_decoded_bbox
self.bbox_coder = build_bbox_coder(bbox_coder)
self.loss_cls = build_loss(loss_cls)
self.loss_bbox = build_loss(loss_bbox)
self.train_cfg = train_cfg
self.test_cfg = test_cfg
# 分配器 及 采样器
if self.train_cfg:
self.assigner = build_assigner(self.train_cfg.assigner)
if hasattr(self.train_cfg,
'sampler') and self.train_cfg.sampler.type.split(
'.')[-1] != 'PseudoSampler':
self.sampling = True
sampler_cfg = self.train_cfg.sampler
# avoid BC-breaking
if loss_cls['type'] in [
'FocalLoss', 'GHMC', 'QualityFocalLoss'
]:
warnings.warn(
'DeprecationWarning: Determining whether to sampling'
'by loss type is deprecated, please delete sampler in'
'your config when using `FocalLoss`, `GHMC`, '
'`QualityFocalLoss` or other FocalLoss variant.')
self.sampling = False
sampler_cfg = dict(type='PseudoSampler')
else:
self.sampling = False
sampler_cfg = dict(type='PseudoSampler')
self.sampler = build_sampler(sampler_cfg, context=self)
self.fp16_enabled = False
# 预设anchor生成器
self.prior_generator = build_prior_generator(anchor_generator)
# Usually the numbers of anchors for each level are the same
# except SSD detectors. So it is an int in the most dense
# heads but a list of int in SSDHead
self.num_base_priors = self.prior_generator.num_base_priors[0]
self._init_layers()
@property
def num_anchors(self):
# 一个像素点有几个anchor
warnings.warn('DeprecationWarning: `num_anchors` is deprecated, '
'for consistency or also use '
'`num_base_priors` instead')
return self.prior_generator.num_base_priors[0]
@property
def anchor_generator(self):
# 预设anchor生成器
warnings.warn('DeprecationWarning: anchor_generator is deprecated, '
'please use "prior_generator" instead')
return self.prior_generator
def _init_layers(self):
# mask rcnn会重写改方法,卷积操作,在faster rcnn中是1个3*3+2个1*1
"""Initialize layers of the head."""
self.conv_cls = nn.Conv2d(self.in_channels,
self.num_base_priors * self.cls_out_channels,
1)
self.conv_reg = nn.Conv2d(self.in_channels, self.num_base_priors * 4,
1)
def forward_single(self, x):
# head涉及的卷积操作,mask rcnn会重写该方法,
"""Forward feature of a single scale level.
Args:
x (Tensor): Features of a single scale level.
Returns:
tuple:
cls_score (Tensor): Cls scores for a single scale level \
the channels number is num_base_priors * num_classes.
bbox_pred (Tensor): Box energies / deltas for a single scale \
level, the channels number is num_base_priors * 4.
"""
cls_score = self.conv_cls(x)
bbox_pred = self.conv_reg(x)
return cls_score, bbox_pred
def forward(self, feats):
# 批量处理,每一层都进行一下卷积操作
"""Forward features from the upstream network.
Args:
feats (tuple[Tensor]): Features from the upstream network, each is
a 4D-tensor.
Returns:
tuple: A tuple of classification scores and bbox prediction.
- cls_scores (list[Tensor]): Classification scores for all \
scale levels, each is a 4D-tensor, the channels number \
is num_base_priors * num_classes.
- bbox_preds (list[Tensor]): Box energies / deltas for all \
scale levels, each is a 4D-tensor, the channels number \
is num_base_priors * 4.
"""
return multi_apply(self.forward_single, feats)
def get_anchors(self, featmap_sizes, img_metas, device='cuda'):
# 为每张图的每一层生成anchor,返回所有理论anchor及每个anchor是否有效的flag
"""Get anchors according to feature map sizes.
Args:
featmap_sizes (list[tuple]): Multi-level feature map sizes.
img_metas (list[dict]): Image meta info.
device (torch.device | str): Device for returned tensors
Returns:
tuple:
anchor_list (list[Tensor]): Anchors of each image.
valid_flag_list (list[Tensor]): Valid flags of each image.
"""
num_imgs = len(img_metas)
# since feature map sizes of all images are the same, we only compute
# anchors for one time
# 单张图像多层的anchor,返回list[num_levels * tensor(H*W*num_base_anchors,4)]
multi_level_anchors = self.prior_generator.grid_priors(
featmap_sizes, device=device)
# 将单张图像的anchor 复制多次,作为每张图的anchor
anchor_list = [multi_level_anchors for _ in range(num_imgs)]
# for each image, we compute valid flags of multi level anchors
# 将多张图像放到一个batch时进行了pad操作,因此每张图片的pad是不一样的,每张的无效anchor也是不一样的
valid_flag_list = [] # bool类型,1为有效,0为无效
for img_id, img_meta in enumerate(img_metas):
multi_level_flags = self.prior_generator.valid_flags(
featmap_sizes, img_meta['pad_shape'], device)
valid_flag_list.append(multi_level_flags)
return anchor_list, valid_flag_list
def _get_targets_single(self,
flat_anchors,
valid_flags,
gt_bboxes,
gt_bboxes_ignore,
gt_labels,
img_meta,
label_channels=1,
unmap_outputs=True):
# 单张图像的每个anchor分类和回归目标计算,输入展平的anchor,将这些anchor分配采样,输出每个anchor的label和box的target
"""Compute regression and classification targets for anchors in a
single image.
Args:
flat_anchors (Tensor): Multi-level anchors of the image, which are
concatenated into a single tensor of shape (num_anchors ,4)
valid_flags (Tensor): Multi level valid flags of the image,
which are concatenated into a single tensor of
shape (num_anchors,).
gt_bboxes (Tensor): Ground truth bboxes of the image,
shape (num_gts, 4).
gt_bboxes_ignore (Tensor): Ground truth bboxes to be
ignored, shape (num_ignored_gts, 4).
img_meta (dict): Meta info of the image.
gt_labels (Tensor): Ground truth labels of each box,
shape (num_gts,).
label_channels (int): Channel of label.
unmap_outputs (bool): Whether to map outputs back to the original
set of anchors.
Returns:
tuple:
labels_list (list[Tensor]): Labels of each level 正样本对应的class
label_weights_list (list[Tensor]): Label weights of each level # 正样本(1 或 配置文件pos_weight参数)、负样本(1)或忽略样本(0)权重
bbox_targets_list (list[Tensor]): BBox targets of each level 正样本对应的gt位置信息
bbox_weights_list (list[Tensor]): BBox weights of each level 取值0、1,正样本为1,负样本和忽略样本为0。
num_total_pos (int): Number of positive samples in all images
num_total_neg (int): Number of negative samples in all images
"""
# 清洗,获取边界范围内anchor索引
inside_flags = anchor_inside_flags(flat_anchors, valid_flags,
img_meta['img_shape'][:2],
self.train_cfg.allowed_border) # cascade mask rcnn 中 allowed_border=0
if not inside_flags.any():
return (None, ) * 7
# assign gt and sample anchors
# 清洗,基于索引获取边界内anchor
anchors = flat_anchors[inside_flags, :]
# 正负样本分配
assign_result = self.assigner.assign(
anchors, gt_bboxes, gt_bboxes_ignore,
None if self.sampling else gt_labels)
# 正负样本采样,返回结果包含sampling_result.pos_gt_bboxes
sampling_result = self.sampler.sample(assign_result, anchors,
gt_bboxes)
# 初始化
num_valid_anchors = anchors.shape[0]
bbox_targets = torch.zeros_like(anchors)
bbox_weights = torch.zeros_like(anchors)
# 值全为num_classes,实际从0开始编号,这个值无效
labels = anchors.new_full((num_valid_anchors, ),
self.num_classes,
dtype=torch.long)
# 值全为0
label_weights = anchors.new_zeros(num_valid_anchors, dtype=torch.float)
pos_inds = sampling_result.pos_inds
neg_inds = sampling_result.neg_inds
if len(pos_inds) > 0:
if not self.reg_decoded_bbox: # 如果是IoU loss类,要对预测框解码
# 编码
pos_bbox_targets = self.bbox_coder.encode(
sampling_result.pos_bboxes, sampling_result.pos_gt_bboxes)
else:
pos_bbox_targets = sampling_result.pos_gt_bboxes
bbox_targets[pos_inds, :] = pos_bbox_targets
bbox_weights[pos_inds, :] = 1.0 # 正样本权重为1
if gt_labels is None:
# Only rpn gives gt_labels as None
# Foreground is the first class since v2.5.0
# 在2阶段rpn中,正例分类目标是背景类别
labels[pos_inds] = 0
else:
labels[pos_inds] = gt_labels[
sampling_result.pos_assigned_gt_inds]
if self.train_cfg.pos_weight <= 0:
label_weights[pos_inds] = 1.0
else:
label_weights[pos_inds] = self.train_cfg.pos_weight # 一般正样本权重是1
if len(neg_inds) > 0:
label_weights[neg_inds] = 1.0 # 一般负样本权重为1
# map up to original set of anchors
# map up 到原来的anchor数量,因为之前根据valid flags把pad黑边部分的anchor去除了
if unmap_outputs:
num_total_anchors = flat_anchors.size(0)
labels = unmap(
labels, num_total_anchors, inside_flags,
fill=self.num_classes) # fill bg label
label_weights = unmap(label_weights, num_total_anchors,
inside_flags)
bbox_targets = unmap(bbox_targets, num_total_anchors, inside_flags)
bbox_weights = unmap(bbox_weights, num_total_anchors, inside_flags)
"""
labels rpn中正样本对应处为0或0 ~ num_class-1,负样本或无效样本为num_classes
label_weights 正样本对应处为1 或设定权重self.train_cfg.pos_weight;负样本为1;无效样本为0
bbox_targets 正样本对应处为gt box
bbox_weights 正样本对应处为 1,负样本或无效样本为0
pos_inds 正样本索引
neg_inds 负样本索引
sampling_result 正负样本采样结果
"""
return (labels, label_weights, bbox_targets, bbox_weights, pos_inds,
neg_inds, sampling_result)
def get_targets(self,
anchor_list,
valid_flag_list,
gt_bboxes_list,
img_metas,
gt_bboxes_ignore_list=None,
gt_labels_list=None,
label_channels=1,
unmap_outputs=True,
return_sampling_results=False):
# 获得多个图像的每个anchor的分类target和定位target。返回label target、label weight、bbox target、bbox weight。
# 各个图像内展平anchor--》_get_targets_single--》images_to_levels
"""Compute regression and classification targets for anchors in
multiple images.
Args:
anchor_list (list[list[Tensor]]): Multi level anchors of each
image. The outer list indicates images, and the inner list
corresponds to feature levels of the image. Each element of
the inner list is a tensor of shape (num_anchors, 4).
valid_flag_list (list[list[Tensor]]): Multi level valid flags of
each image. The outer list indicates images, and the inner list
corresponds to feature levels of the image. Each element of
the inner list is a tensor of shape (num_anchors, )
gt_bboxes_list (list[Tensor]): Ground truth bboxes of each image.
img_metas (list[dict]): Meta info of each image.
gt_bboxes_ignore_list (list[Tensor]): Ground truth bboxes to be
ignored.
gt_labels_list (list[Tensor]): Ground truth labels of each box.
label_channels (int): Channel of label.
unmap_outputs (bool): Whether to map outputs back to the original
set of anchors.
Returns:
tuple: Usually returns a tuple containing learning targets.
- labels_list (list[Tensor]): Labels of each level.
- label_weights_list (list[Tensor]): Label weights of each
level.
- bbox_targets_list (list[Tensor]): BBox targets of each level.
- bbox_weights_list (list[Tensor]): BBox weights of each level.
- num_total_pos (int): Number of positive samples in all
images.
- num_total_neg (int): Number of negative samples in all
images.
additional_returns: This function enables user-defined returns from
`self._get_targets_single`. These returns are currently refined
to properties at each feature map (i.e. having HxW dimension).
The results will be concatenated after the end
"""
num_imgs = len(img_metas)
assert len(anchor_list) == len(valid_flag_list) == num_imgs
# anchor number of multi levels
# 各层的anchor数量
num_level_anchors = [anchors.size(0) for anchors in anchor_list[0]]
# concat all level anchors to a single tensor
# 每张图的anchor信息,每个图内部的anchor进行展平,后对flat的各图anchor计算target,再通过images_to_levels分层
# 1、展平 flat
concat_anchor_list = []
concat_valid_flag_list = []
for i in range(num_imgs):
assert len(anchor_list[i]) == len(valid_flag_list[i])
concat_anchor_list.append(torch.cat(anchor_list[i]))
concat_valid_flag_list.append(torch.cat(valid_flag_list[i]))
# compute targets for each image
# 2、计算每张图像的target
if gt_bboxes_ignore_list is None:
gt_bboxes_ignore_list = [None for _ in range(num_imgs)]
if gt_labels_list is None:
gt_labels_list = [None for _ in range(num_imgs)]
results = multi_apply( # 为每张图计算分类和回归目标
self._get_targets_single,
concat_anchor_list,
concat_valid_flag_list,
gt_bboxes_list,
gt_bboxes_ignore_list,
gt_labels_list,
img_metas,
label_channels=label_channels,
unmap_outputs=unmap_outputs)
(all_labels, all_label_weights, all_bbox_targets, all_bbox_weights,
pos_inds_list, neg_inds_list, sampling_results_list) = results[:7]
rest_results = list(results[7:]) # user-added return values 可以自定义一些返回信息
# no valid anchors
if any([labels is None for labels in all_labels]):
return None
# sampled anchors of all images
num_total_pos = sum([max(inds.numel(), 1) for inds in pos_inds_list])
num_total_neg = sum([max(inds.numel(), 1) for inds in neg_inds_list])
# split targets to a list w.r.t. multiple levels
# 3、分层
labels_list = images_to_levels(all_labels, num_level_anchors)
label_weights_list = images_to_levels(all_label_weights,
num_level_anchors)
bbox_targets_list = images_to_levels(all_bbox_targets,
num_level_anchors)
bbox_weights_list = images_to_levels(all_bbox_weights,
num_level_anchors)
res = (labels_list, label_weights_list, bbox_targets_list,
bbox_weights_list, num_total_pos, num_total_neg)
if return_sampling_results: # 返回结果中包含采样结果
res = res + (sampling_results_list, )
for i, r in enumerate(rest_results): # user-added return values 用户自定义信息
rest_results[i] = images_to_levels(r, num_level_anchors)
return res + tuple(rest_results)
def loss_single(self, cls_score, bbox_pred, anchors, labels, label_weights,
bbox_targets, bbox_weights, num_total_samples):
# 每张图片的各层损失计算,分别返回分类和回归损失
"""Compute loss of a single scale level.
Args:
cls_score (Tensor): Box scores for each scale level
Has shape (N, num_anchors * num_classes, H, W). # 注意这里多类别时的 cls_score
bbox_pred (Tensor): Box energies / deltas for each scale
level with shape (N, num_anchors * 4, H, W). # 注意这里多类别时的num_anchors*4,并没有再* num_classes
anchors (Tensor): Box reference for each scale level with shape
(N, num_total_anchors, 4).
labels (Tensor): Labels of each anchors with shape
(N, num_total_anchors).
label_weights (Tensor): Label weights of each anchor with shape
(N, num_total_anchors)
bbox_targets (Tensor): BBox regression targets of each anchor
weight shape (N, num_total_anchors, 4). # 尺寸同anchors
bbox_weights (Tensor): BBox regression loss weights of each anchor
with shape (N, num_total_anchors, 4).
num_total_samples (int): If sampling, num total samples equal to
the number of total anchors; Otherwise, it is the number of
positive anchors.
Returns:
dict[str, Tensor]: A dictionary of loss components.
"""
# classification loss
labels = labels.reshape(-1)
label_weights = label_weights.reshape(-1)
# B,num_anchors*cls_out_channels,H,W -> B,H,W,num_anchors*cls_out_channels -> B*H*W*num_anchors,cls_out_channels
cls_score = cls_score.permute(0, 2, 3,1).reshape(-1, self.cls_out_channels)
# 分类损失
loss_cls = self.loss_cls(
cls_score, labels, label_weights, avg_factor=num_total_samples)
# regression loss
# 回归损失
bbox_targets = bbox_targets.reshape(-1, 4)
bbox_weights = bbox_weights.reshape(-1, 4)
bbox_pred = bbox_pred.permute(0, 2, 3, 1).reshape(-1, 4)
if self.reg_decoded_bbox:
# 如果是IoU loss类,要对预测框解码
# When the regression loss (e.g. `IouLoss`, `GIouLoss`)
# is applied directly on the decoded bounding boxes, it
# decodes the already encoded coordinates to absolute format.
anchors = anchors.reshape(-1, 4)
bbox_pred = self.bbox_coder.decode(anchors, bbox_pred)
loss_bbox = self.loss_bbox(
bbox_pred,
bbox_targets,
bbox_weights,
avg_factor=num_total_samples)
return loss_cls, loss_bbox
@force_fp32(apply_to=('cls_scores', 'bbox_preds'))
def loss(self,
cls_scores,
bbox_preds,
gt_bboxes,
gt_labels,
img_metas,
gt_bboxes_ignore=None):
"""Compute losses of the head.
Args:
cls_scores (list[Tensor]): Box scores for each scale level
Has shape (N, num_anchors * num_classes, H, W)
bbox_preds (list[Tensor]): Box energies / deltas for each scale
level with shape (N, num_anchors * 4, H, W)
gt_bboxes (list[Tensor]): Ground truth bboxes for each image with
shape (num_gts, 4) in [tl_x, tl_y, br_x, br_y] format.
gt_labels (list[Tensor]): class indices corresponding to each box
img_metas (list[dict]): Meta information of each image, e.g.,
image size, scaling factor, etc.
gt_bboxes_ignore (None | list[Tensor]): specify which bounding
boxes can be ignored when computing the loss. Default: None
Returns:
dict[str, Tensor]: A dictionary of loss components.
"""
featmap_sizes = [featmap.size()[-2:] for featmap in cls_scores]
assert len(featmap_sizes) == self.prior_generator.num_levels
device = cls_scores[0].device
# 1、获得所有anchor锚框,返回anchor_list形为:list[num_imgs*list[num_levels*tensor(H*W*num_anchors,4)]]
# 注意anchor数量和label无关
anchor_list, valid_flag_list = self.get_anchors(
featmap_sizes, img_metas, device=device)
label_channels = self.cls_out_channels if self.use_sigmoid_cls else 1
# 2、对anchor 分配采样,计算target
cls_reg_targets = self.get_targets(
anchor_list,
valid_flag_list,
gt_bboxes,
img_metas,
gt_bboxes_ignore_list=gt_bboxes_ignore,
gt_labels_list=gt_labels,
label_channels=label_channels)
if cls_reg_targets is None:
return None
(labels_list, label_weights_list, bbox_targets_list, bbox_weights_list,
num_total_pos, num_total_neg) = cls_reg_targets
# 用于归一化
num_total_samples = (
num_total_pos + num_total_neg if self.sampling else num_total_pos)
# anchor number of multi levels
num_level_anchors = [anchors.size(0) for anchors in anchor_list[0]]
# concat all level anchors and flags to a single tensor
concat_anchor_list = []
for i in range(len(anchor_list)):
concat_anchor_list.append(torch.cat(anchor_list[i]))
all_anchor_list = images_to_levels(concat_anchor_list,
num_level_anchors)
# 3、计算损失
losses_cls, losses_bbox = multi_apply(
self.loss_single,
cls_scores,
bbox_preds,
all_anchor_list,
labels_list,
label_weights_list,
bbox_targets_list,
bbox_weights_list,
num_total_samples=num_total_samples)
return dict(loss_cls=losses_cls, loss_bbox=losses_bbox)
def aug_test(self, feats, img_metas, rescale=False):
"""Test function with test time augmentation.
Args:
feats (list[Tensor]): the outer list indicates test-time
augmentations and inner Tensor should have a shape NxCxHxW,
which contains features for all images in the batch.
img_metas (list[list[dict]]): the outer list indicates test-time
augs (multiscale, flip, etc.) and the inner list indicates
images in a batch. each dict has image information.
rescale (bool, optional): Whether to rescale the results.
Defaults to False.
Returns:
list[tuple[Tensor, Tensor]]: Each item in result_list is 2-tuple.
The first item is ``bboxes`` with shape (n, 5), where
5 represent (tl_x, tl_y, br_x, br_y, score).
The shape of the second tensor in the tuple is ``labels``
with shape (n,), The length of list should always be 1.
"""
return self.aug_test_bboxes(feats, img_metas, rescale=rescale)
第二部分:RPNHead
卷积计算在AnchorHead中2个1*1基础上增加了1个3*3
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmcv.cnn import ConvModule
from mmcv.ops import batched_nms
from ..builder import HEADS
from .anchor_head import AnchorHead
@HEADS.register_module()
class RPNHead(AnchorHead):
"""RPN head.
Args:
in_channels (int): Number of channels in the input feature map.
init_cfg (dict or list[dict], optional): Initialization config dict.
num_convs (int): Number of convolution layers in the head. Default 1.
""" # noqa: W605
def __init__(self,
in_channels,
init_cfg=dict(type='Normal', layer='Conv2d', std=0.01),
num_convs=1,
**kwargs):
self.num_convs = num_convs
super(RPNHead, self).__init__(
1, in_channels, init_cfg=init_cfg, **kwargs)
def _init_layers(self):
"""Initialize layers of the head."""
if self.num_convs > 1: # 多个3*3的卷积
rpn_convs = []
for i in range(self.num_convs):
if i == 0:
in_channels = self.in_channels
else:
in_channels = self.feat_channels
# use ``inplace=False`` to avoid error: one of the variables
# needed for gradient computation has been modified by an
# inplace operation.
rpn_convs.append(
ConvModule(
in_channels,
self.feat_channels,
3,
padding=1,
inplace=False))
self.rpn_conv = nn.Sequential(*rpn_convs)
else: # 1个3*3的卷积
self.rpn_conv = nn.Conv2d(
self.in_channels, self.feat_channels, 3, padding=1)
self.rpn_cls = nn.Conv2d(self.feat_channels, # 1*1 的卷积
self.num_base_priors * self.cls_out_channels,
1)
self.rpn_reg = nn.Conv2d(self.feat_channels, self.num_base_priors * 4, # 1*1 的卷积
1)
def forward_single(self, x):
# 在每一层上进行这个操作
"""Forward feature map of a single scale level."""
x = self.rpn_conv(x) # 先来个3*3卷积
x = F.relu(x, inplace=False)
rpn_cls_score = self.rpn_cls(x) # 2个1*1卷积
rpn_bbox_pred = self.rpn_reg(x)
return rpn_cls_score, rpn_bbox_pred
def loss(self,
cls_scores,
bbox_preds,
gt_bboxes,
img_metas,
gt_bboxes_ignore=None):
"""Compute losses of the head.
Args:
cls_scores (list[Tensor]): Box scores for each scale level
Has shape (N, num_anchors * num_classes, H, W)
bbox_preds (list[Tensor]): Box energies / deltas for each scale
level with shape (N, num_anchors * 4, H, W)
gt_bboxes (list[Tensor]): Ground truth bboxes for each image with
shape (num_gts, 4) in [tl_x, tl_y, br_x, br_y] format.
img_metas (list[dict]): Meta information of each image, e.g.,
image size, scaling factor, etc.
gt_bboxes_ignore (None | list[Tensor]): specify which bounding
boxes can be ignored when computing the loss.
Returns:
dict[str, Tensor]: A dictionary of loss components.
"""
losses = super(RPNHead, self).loss(
cls_scores,
bbox_preds,
gt_bboxes,
None,
img_metas,
gt_bboxes_ignore=gt_bboxes_ignore)
return dict(
loss_rpn_cls=losses['loss_cls'], loss_rpn_bbox=losses['loss_bbox'])
def _get_bboxes_single(self,
cls_score_list,
bbox_pred_list,
score_factor_list,
mlvl_anchors,
img_meta,
cfg,
rescale=False,
with_nms=True,
**kwargs):
# 获取单张图像每层top nms_pre(如2000)的预测框,再nms等后处理,得到最终的预测box
"""Transform outputs of a single image into bbox predictions.
Args:
cls_score_list (list[Tensor]): Box scores from all scale 各层各anchor的分类得分
levels of a single image, each item has shape
(num_anchors * num_classes, H, W).
bbox_pred_list (list[Tensor]): Box energies / deltas from
all scale levels of a single image, each item has
shape (num_anchors * 4, H, W).
score_factor_list (list[Tensor]): Score factor from all scale
levels of a single image. RPN head does not need this value.
mlvl_anchors (list[Tensor]): Anchors of all scale level
each item has shape (num_anchors, 4).
img_meta (dict): Image meta info.
cfg (mmcv.Config): Test / postprocessing configuration,
if None, test_cfg would be used.
rescale (bool): If True, return boxes in original image space.
Default: False.
with_nms (bool): If True, do nms before return boxes.
Default: True.
Returns:
Tensor: Labeled boxes in shape (n, 5), where the first 4 columns
are bounding box positions (tl_x, tl_y, br_x, br_y) and the
5-th column is a score between 0 and 1.
"""
cfg = self.test_cfg if cfg is None else cfg
cfg = copy.deepcopy(cfg)
img_shape = img_meta['img_shape']
# bboxes from different level should be independent during NMS,
# level_ids are used as labels for batched NMS to separate them
level_ids = []
mlvl_scores = []
mlvl_bbox_preds = []
mlvl_valid_anchors = []
nms_pre = cfg.get('nms_pre', -1)
# 对各层取top2000
for level_idx in range(len(cls_score_list)):
rpn_cls_score = cls_score_list[level_idx]
rpn_bbox_pred = bbox_pred_list[level_idx]
assert rpn_cls_score.size()[-2:] == rpn_bbox_pred.size()[-2:]
rpn_cls_score = rpn_cls_score.permute(1, 2, 0)
if self.use_sigmoid_cls:
rpn_cls_score = rpn_cls_score.reshape(-1)
scores = rpn_cls_score.sigmoid()
else:
rpn_cls_score = rpn_cls_score.reshape(-1, 2)
# We set FG labels to [0, num_class-1] and BG label to
# num_class in RPN head since mmdet v2.5, which is unified to
# be consistent with other head since mmdet v2.0. In mmdet v2.0
# to v2.4 we keep BG label as 0 and FG label as 1 in rpn head.
scores = rpn_cls_score.softmax(dim=1)[:, 0]
rpn_bbox_pred = rpn_bbox_pred.permute(1, 2, 0).reshape(-1, 4)
anchors = mlvl_anchors[level_idx]
# 取最高的几个得分的 box和anchor
if 0 < nms_pre < scores.shape[0]: # nms_pre=2000
# sort is faster than topk
# _, topk_inds = scores.topk(cfg.nms_pre)
ranked_scores, rank_inds = scores.sort(descending=True)
topk_inds = rank_inds[:nms_pre]
scores = ranked_scores[:nms_pre]
rpn_bbox_pred = rpn_bbox_pred[topk_inds, :]
anchors = anchors[topk_inds, :]
mlvl_scores.append(scores)
mlvl_bbox_preds.append(rpn_bbox_pred)
mlvl_valid_anchors.append(anchors)
level_ids.append(
scores.new_full((scores.size(0), ),
level_idx,
dtype=torch.long))
return self._bbox_post_process(mlvl_scores, mlvl_bbox_preds,
mlvl_valid_anchors, level_ids, cfg,
img_shape)
def _bbox_post_process(self, mlvl_scores, mlvl_bboxes, mlvl_valid_anchors,
level_ids, cfg, img_shape, **kwargs):
# nms、按面积过滤、取top 等后处理
"""bbox post-processing method.
Do the nms operation for bboxes in same level.
Args:
mlvl_scores (list[Tensor]): Box scores from all scale
levels of a single image, each item has shape
(num_bboxes, ).
mlvl_bboxes (list[Tensor]): Decoded bboxes from all scale
levels of a single image, each item has shape (num_bboxes, 4).
mlvl_valid_anchors (list[Tensor]): Anchors of all scale level
each item has shape (num_bboxes, 4).
level_ids (list[Tensor]): Indexes from all scale levels of a
single image, each item has shape (num_bboxes, ).
cfg (mmcv.Config): Test / postprocessing configuration,
if None, `self.test_cfg` would be used.
img_shape (tuple(int)): The shape of model's input image.
Returns:
Tensor: Labeled boxes in shape (n, 5), where the first 4 columns 这里的输出没有all scale levels的概念了
are bounding box positions (tl_x, tl_y, br_x, br_y) and the
5-th column is a score between 0 and 1.
"""
# 把各层cat了
scores = torch.cat(mlvl_scores)
anchors = torch.cat(mlvl_valid_anchors)
rpn_bbox_pred = torch.cat(mlvl_bboxes)
proposals = self.bbox_coder.decode(
anchors, rpn_bbox_pred, max_shape=img_shape)
ids = torch.cat(level_ids)
# 根据面积过滤太小的box
if cfg.min_bbox_size >= 0:
w = proposals[:, 2] - proposals[:, 0]
h = proposals[:, 3] - proposals[:, 1]
valid_mask = (w > cfg.min_bbox_size) & (h > cfg.min_bbox_size)
if not valid_mask.all():
proposals = proposals[valid_mask]
scores = scores[valid_mask]
ids = ids[valid_mask]
# nms
if proposals.numel() > 0:
dets, _ = batched_nms(proposals, scores, ids, cfg.nms)
else:
return proposals.new_zeros(0, 5)
# nms后取top
return dets[:cfg.max_per_img] # max_per_img=2000,
def onnx_export(self, x, img_metas):
"""Test without augmentation.
Args:
x (tuple[Tensor]): Features from the upstream network, each is
a 4D-tensor.
img_metas (list[dict]): Meta info of each image.
Returns:
Tensor: dets of shape [N, num_det, 5].
"""
cls_scores, bbox_preds = self(x)
assert len(cls_scores) == len(bbox_preds)
batch_bboxes, batch_scores = super(RPNHead, self).onnx_export(
cls_scores, bbox_preds, img_metas=img_metas, with_nms=False)
# Use ONNX::NonMaxSuppression in deployment
from mmdet.core.export import add_dummy_nms_for_onnx
cfg = copy.deepcopy(self.test_cfg)
score_threshold = cfg.nms.get('score_thr', 0.0)
nms_pre = cfg.get('deploy_nms_pre', -1)
# Different from the normal forward doing NMS level by level,
# we do NMS across all levels when exporting ONNX.
dets, _ = add_dummy_nms_for_onnx(batch_bboxes, batch_scores,
cfg.max_per_img,
cfg.nms.iou_threshold,
score_threshold, nms_pre,
cfg.max_per_img)
return dets
7、roi_head(CascadeRoIHead)
路径mmdet/models/roi_heads/cascade_roi_head.py。继承关系为
class CascadeRoIHead(BaseRoIHead, BBoxTestMixin, MaskTestMixin):
class BaseRoIHead(BaseModule, metaclass=ABCMeta):
第一部分:CascadeRoIHead
# Copyright (c) OpenMMLab. All rights reserved.
import numpy as np
import torch
import torch.nn as nn
from mmcv.runner import ModuleList
from mmdet.core import (bbox2result, bbox2roi, bbox_mapping, build_assigner,
build_sampler, merge_aug_bboxes, merge_aug_masks,
multiclass_nms)
from ..builder import HEADS, build_head, build_roi_extractor
from .base_roi_head import BaseRoIHead
from .test_mixins import BBoxTestMixin, MaskTestMixin
@HEADS.register_module()
class CascadeRoIHead(BaseRoIHead, BBoxTestMixin, MaskTestMixin):
"""Cascade roi head including one bbox head and one mask head.
https://arxiv.org/abs/1712.00726
"""
def __init__(self,
num_stages,
stage_loss_weights,
bbox_roi_extractor=None,
bbox_head=None,
mask_roi_extractor=None,
mask_head=None,
shared_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None,
init_cfg=None):
assert bbox_roi_extractor is not None
assert bbox_head is not None
assert shared_head is None, \
'Shared head is not supported in Cascade RCNN anymore'
self.num_stages = num_stages
self.stage_loss_weights = stage_loss_weights
super(CascadeRoIHead, self).__init__(
bbox_roi_extractor=bbox_roi_extractor,
bbox_head=bbox_head,
mask_roi_extractor=mask_roi_extractor,
mask_head=mask_head,
shared_head=shared_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained,
init_cfg=init_cfg)
def init_bbox_head(self, bbox_roi_extractor, bbox_head):
# 用mmcv的 build_head、build_roi_extractor 建多个级的bbox_roi_extractor、bbox_head
"""Initialize box head and box roi extractor.
Args:
bbox_roi_extractor (dict): Config of box roi extractor.
bbox_head (dict): Config of box in box head.
"""
self.bbox_roi_extractor = ModuleList()
self.bbox_head = ModuleList()
if not isinstance(bbox_roi_extractor, list):
bbox_roi_extractor = [
bbox_roi_extractor for _ in range(self.num_stages)
]
if not isinstance(bbox_head, list):
bbox_head = [bbox_head for _ in range(self.num_stages)]
assert len(bbox_roi_extractor) == len(bbox_head) == self.num_stages
for roi_extractor, head in zip(bbox_roi_extractor, bbox_head):
self.bbox_roi_extractor.append(build_roi_extractor(roi_extractor))
self.bbox_head.append(build_head(head))
def init_mask_head(self, mask_roi_extractor, mask_head):
# 用mmcv的build_head、build_roi_extractor建多个级的mask_roi_extractor、mask_head
"""Initialize mask head and mask roi extractor.
Args:
mask_roi_extractor (dict): Config of mask roi extractor.
mask_head (dict): Config of mask in mask head.
"""
self.mask_head = nn.ModuleList()
if not isinstance(mask_head, list):
mask_head = [mask_head for _ in range(self.num_stages)]
assert len(mask_head) == self.num_stages
for head in mask_head:
self.mask_head.append(build_head(head))
if mask_roi_extractor is not None:
self.share_roi_extractor = False
self.mask_roi_extractor = ModuleList()
if not isinstance(mask_roi_extractor, list):
mask_roi_extractor = [
mask_roi_extractor for _ in range(self.num_stages)
]
assert len(mask_roi_extractor) == self.num_stages
for roi_extractor in mask_roi_extractor:
self.mask_roi_extractor.append(
build_roi_extractor(roi_extractor))
else:
self.share_roi_extractor = True # 是啥?
self.mask_roi_extractor = self.bbox_roi_extractor
def init_assigner_sampler(self):
# 用mmcv的build_assigner、build_sampler 建 rcnn的多个分配器及采样器
"""Initialize assigner and sampler for each stage."""
self.bbox_assigner = []
self.bbox_sampler = []
if self.train_cfg is not None:
for idx, rcnn_train_cfg in enumerate(self.train_cfg):
self.bbox_assigner.append(
build_assigner(rcnn_train_cfg.assigner))
self.current_stage = idx
self.bbox_sampler.append(
build_sampler(rcnn_train_cfg.sampler, context=self))
def forward_dummy(self, x, proposals):
# 多级3个分支的前向推理
"""Dummy forward function."""
# bbox head
outs = ()
rois = bbox2roi([proposals]) # 转化一下box的格式到shape (n, 5), [batch_ind, x1, y1, x2, y2]
if self.with_bbox:
for i in range(self.num_stages):
bbox_results = self._bbox_forward(i, x, rois)
outs = outs + (bbox_results['cls_score'],
bbox_results['bbox_pred'])
# mask heads
if self.with_mask:
mask_rois = rois[:100]
for i in range(self.num_stages):
mask_results = self._mask_forward(i, x, mask_rois)
outs = outs + (mask_results['mask_pred'], )
return outs
def _bbox_forward(self, stage, x, rois):
# bbox分支获取cls和box
"""Box head forward function used in both training and testing."""
bbox_roi_extractor = self.bbox_roi_extractor[stage] # 取第i级的bbox_roi_extractor
# 先经过bbox_roi_extractor,后bbox_head
bbox_head = self.bbox_head[stage]
bbox_feats = bbox_roi_extractor(x[:bbox_roi_extractor.num_inputs],
rois)
# do not support caffe_c4 model anymore
cls_score, bbox_pred = bbox_head(bbox_feats)
bbox_results = dict(
cls_score=cls_score, bbox_pred=bbox_pred, bbox_feats=bbox_feats)
return bbox_results
def _bbox_forward_train(self, stage, x, sampling_results, gt_bboxes,
gt_labels, rcnn_train_cfg):
# 先前向,后计算和target的loss
"""Run forward function and calculate loss for box head in training."""
rois = bbox2roi([res.bboxes for res in sampling_results])
bbox_results = self._bbox_forward(stage, x, rois)
bbox_targets = self.bbox_head[stage].get_targets(
sampling_results, gt_bboxes, gt_labels, rcnn_train_cfg)
loss_bbox = self.bbox_head[stage].loss(bbox_results['cls_score'],
bbox_results['bbox_pred'], rois,
*bbox_targets)
bbox_results.update(
loss_bbox=loss_bbox, rois=rois, bbox_targets=bbox_targets)
return bbox_results
def _mask_forward(self, stage, x, rois):
"""Mask head forward function used in both training and testing."""
# mask分支取mask结果
mask_roi_extractor = self.mask_roi_extractor[stage]
mask_head = self.mask_head[stage]
# 先mask_roi_extractor后mask_head
# 遗留困惑:mask_roi_extractor和bbox_roi_extractor不一样
mask_feats = mask_roi_extractor(x[:mask_roi_extractor.num_inputs],
rois)
# do not support caffe_c4 model anymore
mask_pred = mask_head(mask_feats)
mask_results = dict(mask_pred=mask_pred)
return mask_results
def _mask_forward_train(self,
stage,
x,
sampling_results,
gt_masks,
rcnn_train_cfg,
bbox_feats=None):
"""Run forward function and calculate loss for mask head in
training."""
# 前向、计算和target的loss_mask
pos_rois = bbox2roi([res.pos_bboxes for res in sampling_results])
mask_results = self._mask_forward(stage, x, pos_rois)
mask_targets = self.mask_head[stage].get_targets(
sampling_results, gt_masks, rcnn_train_cfg)
pos_labels = torch.cat([res.pos_gt_labels for res in sampling_results])
loss_mask = self.mask_head[stage].loss(mask_results['mask_pred'],
mask_targets, pos_labels)
mask_results.update(loss_mask=loss_mask)
return mask_results
def forward_train(self,
x,
img_metas,
proposal_list,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None):
"""
Args:
x (list[Tensor]): list of multi-level img features.
img_metas (list[dict]): list of image info dict where each dict
has: 'img_shape', 'scale_factor', 'flip', and may also contain
'filename', 'ori_shape', 'pad_shape', and 'img_norm_cfg'.
For details on the values of these keys see
`mmdet/datasets/pipelines/formatting.py:Collect`.
proposals (list[Tensors]): list of region proposals.
gt_bboxes (list[Tensor]): Ground truth bboxes for each image with
shape (num_gts, 4) in [tl_x, tl_y, br_x, br_y] format.
gt_labels (list[Tensor]): class indices corresponding to each box
gt_bboxes_ignore (None | list[Tensor]): specify which bounding
boxes can be ignored when computing the loss.
gt_masks (None | Tensor) : true segmentation masks for each box
used if the architecture supports a segmentation task.
Returns:
dict[str, Tensor]: a dictionary of loss components
"""
losses = dict()
for i in range(self.num_stages): # 3
self.current_stage = i
rcnn_train_cfg = self.train_cfg[i]
lw = self.stage_loss_weights[i]
# assign gts and sample proposals
sampling_results = []
if self.with_bbox or self.with_mask:
bbox_assigner = self.bbox_assigner[i]
bbox_sampler = self.bbox_sampler[i]
num_imgs = len(img_metas)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
for j in range(num_imgs):
assign_result = bbox_assigner.assign( # MaxIoUAssigner
proposal_list[j], gt_bboxes[j], gt_bboxes_ignore[j],
gt_labels[j])
sampling_result = bbox_sampler.sample( # RandomSampler
assign_result,
proposal_list[j],
gt_bboxes[j],
gt_labels[j],
feats=[lvl_feat[j][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# 走2个分支
# bbox head forward and loss
bbox_results = self._bbox_forward_train(i, x, sampling_results,
gt_bboxes, gt_labels,
rcnn_train_cfg)
for name, value in bbox_results['loss_bbox'].items():
losses[f's{i}.{name}'] = (
value * lw if 'loss' in name else value)
# mask head forward and loss
if self.with_mask:
mask_results = self._mask_forward_train(
i, x, sampling_results, gt_masks, rcnn_train_cfg,
bbox_results['bbox_feats'])
for name, value in mask_results['loss_mask'].items():
losses[f's{i}.{name}'] = (
value * lw if 'loss' in name else value)
# refine bboxes
# 级联中会把前一级的proposal_list传到后面,更新bbox
if i < self.num_stages - 1:
pos_is_gts = [res.pos_is_gt for res in sampling_results]
# bbox_targets is a tuple
roi_labels = bbox_results['bbox_targets'][0]
with torch.no_grad():
cls_score = bbox_results['cls_score']
if self.bbox_head[i].custom_activation:
cls_score = self.bbox_head[i].loss_cls.get_activation(
cls_score)
# Empty proposal.
if cls_score.numel() == 0:
break
roi_labels = torch.where(
roi_labels == self.bbox_head[i].num_classes,
cls_score[:, :-1].argmax(1), roi_labels)
proposal_list = self.bbox_head[i].refine_bboxes(
bbox_results['rois'], roi_labels,
bbox_results['bbox_pred'], pos_is_gts, img_metas)
return losses
def simple_test(self, x, proposal_list, img_metas, rescale=False):
# 不使用数据增强的简单test
"""Test without augmentation.
Args:
x (tuple[Tensor]): Features from upstream network. Each
has shape (batch_size, c, h, w).
proposal_list (list(Tensor)): Proposals from rpn head.
Each has shape (num_proposals, 5), last dimension
5 represent (x1, y1, x2, y2, score).
img_metas (list[dict]): Meta information of images.
rescale (bool): Whether to rescale the results to
the original image. Default: True.
Returns:
list[list[np.ndarray]] or list[tuple]: When no mask branch,
it is bbox results of each image and classes with type
`list[list[np.ndarray]]`. The outer list
corresponds to each image. The inner list
corresponds to each class. When the model has mask branch,
it contains bbox results and mask results.
The outer list corresponds to each image, and first element
of tuple is bbox results, second element is mask results.
"""
assert self.with_bbox, 'Bbox head must be implemented.'
num_imgs = len(proposal_list)
img_shapes = tuple(meta['img_shape'] for meta in img_metas)
ori_shapes = tuple(meta['ori_shape'] for meta in img_metas)
scale_factors = tuple(meta['scale_factor'] for meta in img_metas)
# "ms" in variable names means multi-stage
ms_bbox_result = {}
ms_segm_result = {}
ms_scores = []
rcnn_test_cfg = self.test_cfg
rois = bbox2roi(proposal_list)
if rois.shape[0] == 0:
# There is no proposal in the whole batch
bbox_results = [[
np.zeros((0, 5), dtype=np.float32)
for _ in range(self.bbox_head[-1].num_classes)
]] * num_imgs
if self.with_mask:
mask_classes = self.mask_head[-1].num_classes
segm_results = [[[] for _ in range(mask_classes)]
for _ in range(num_imgs)]
results = list(zip(bbox_results, segm_results))
else:
results = bbox_results
return results
for i in range(self.num_stages):
# box +cls 分支
bbox_results = self._bbox_forward(i, x, rois)
# split batch bbox prediction back to each image
cls_score = bbox_results['cls_score']
bbox_pred = bbox_results['bbox_pred']
num_proposals_per_img = tuple(
len(proposals) for proposals in proposal_list)
rois = rois.split(num_proposals_per_img, 0)
cls_score = cls_score.split(num_proposals_per_img, 0)
if isinstance(bbox_pred, torch.Tensor):
bbox_pred = bbox_pred.split(num_proposals_per_img, 0)
else:
bbox_pred = self.bbox_head[i].bbox_pred_split(
bbox_pred, num_proposals_per_img)
ms_scores.append(cls_score)
# 级联更新rois
if i < self.num_stages - 1:
if self.bbox_head[i].custom_activation:
cls_score = [
self.bbox_head[i].loss_cls.get_activation(s)
for s in cls_score
]
refine_rois_list = []
for j in range(num_imgs):
if rois[j].shape[0] > 0:
bbox_label = cls_score[j][:, :-1].argmax(dim=1)
refined_rois = self.bbox_head[i].regress_by_class(
rois[j], bbox_label, bbox_pred[j], img_metas[j])
refine_rois_list.append(refined_rois)
rois = torch.cat(refine_rois_list)
# average scores of each image by stages
cls_score = [
sum([score[i] for score in ms_scores]) / float(len(ms_scores))
for i in range(num_imgs)
]
# apply bbox post-processing to each image individually
det_bboxes = []
det_labels = []
for i in range(num_imgs):
det_bbox, det_label = self.bbox_head[-1].get_bboxes(
rois[i],
cls_score[i],
bbox_pred[i],
img_shapes[i],
scale_factors[i],
rescale=rescale,
cfg=rcnn_test_cfg)
det_bboxes.append(det_bbox)
det_labels.append(det_label)
bbox_results = [
bbox2result(det_bboxes[i], det_labels[i],
self.bbox_head[-1].num_classes)
for i in range(num_imgs)
]
ms_bbox_result['ensemble'] = bbox_results
# mask分支
if self.with_mask:
if all(det_bbox.shape[0] == 0 for det_bbox in det_bboxes):
mask_classes = self.mask_head[-1].num_classes
segm_results = [[[] for _ in range(mask_classes)]
for _ in range(num_imgs)]
else:
if rescale and not isinstance(scale_factors[0], float):
scale_factors = [
torch.from_numpy(scale_factor).to(det_bboxes[0].device)
for scale_factor in scale_factors
]
_bboxes = [
det_bboxes[i][:, :4] *
scale_factors[i] if rescale else det_bboxes[i][:, :4]
for i in range(len(det_bboxes))
]
mask_rois = bbox2roi(_bboxes)
num_mask_rois_per_img = tuple(
_bbox.size(0) for _bbox in _bboxes)
aug_masks = []
for i in range(self.num_stages):
mask_results = self._mask_forward(i, x, mask_rois)
mask_pred = mask_results['mask_pred']
# split batch mask prediction back to each image
mask_pred = mask_pred.split(num_mask_rois_per_img, 0)
aug_masks.append([
m.sigmoid().cpu().detach().numpy() for m in mask_pred
])
# apply mask post-processing to each image individually
segm_results = []
for i in range(num_imgs):
if det_bboxes[i].shape[0] == 0:
segm_results.append(
[[]
for _ in range(self.mask_head[-1].num_classes)])
else:
aug_mask = [mask[i] for mask in aug_masks]
merged_masks = merge_aug_masks(
aug_mask, [[img_metas[i]]] * self.num_stages,
rcnn_test_cfg)
segm_result = self.mask_head[-1].get_seg_masks(
merged_masks, _bboxes[i], det_labels[i],
rcnn_test_cfg, ori_shapes[i], scale_factors[i],
rescale)
segm_results.append(segm_result)
ms_segm_result['ensemble'] = segm_results
if self.with_mask:
results = list(
zip(ms_bbox_result['ensemble'], ms_segm_result['ensemble']))
else:
results = ms_bbox_result['ensemble']
return results
def aug_test(self, features, proposal_list, img_metas, rescale=False):
"""Test with augmentations.
If rescale is False, then returned bboxes and masks will fit the scale
of imgs[0].
"""
rcnn_test_cfg = self.test_cfg
aug_bboxes = []
aug_scores = []
for x, img_meta in zip(features, img_metas):
# only one image in the batch
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
flip = img_meta[0]['flip']
flip_direction = img_meta[0]['flip_direction']
proposals = bbox_mapping(proposal_list[0][:, :4], img_shape,
scale_factor, flip, flip_direction)
# "ms" in variable names means multi-stage
ms_scores = []
rois = bbox2roi([proposals])
if rois.shape[0] == 0:
# There is no proposal in the single image
aug_bboxes.append(rois.new_zeros(0, 4))
aug_scores.append(rois.new_zeros(0, 1))
continue
for i in range(self.num_stages):
bbox_results = self._bbox_forward(i, x, rois)
ms_scores.append(bbox_results['cls_score'])
if i < self.num_stages - 1:
cls_score = bbox_results['cls_score']
if self.bbox_head[i].custom_activation:
cls_score = self.bbox_head[i].loss_cls.get_activation(
cls_score)
bbox_label = cls_score[:, :-1].argmax(dim=1)
rois = self.bbox_head[i].regress_by_class(
rois, bbox_label, bbox_results['bbox_pred'],
img_meta[0])
cls_score = sum(ms_scores) / float(len(ms_scores))
bboxes, scores = self.bbox_head[-1].get_bboxes(
rois,
cls_score,
bbox_results['bbox_pred'],
img_shape,
scale_factor,
rescale=False,
cfg=None)
aug_bboxes.append(bboxes)
aug_scores.append(scores)
# after merging, bboxes will be rescaled to the original image size
merged_bboxes, merged_scores = merge_aug_bboxes(
aug_bboxes, aug_scores, img_metas, rcnn_test_cfg)
det_bboxes, det_labels = multiclass_nms(merged_bboxes, merged_scores,
rcnn_test_cfg.score_thr,
rcnn_test_cfg.nms,
rcnn_test_cfg.max_per_img)
bbox_result = bbox2result(det_bboxes, det_labels,
self.bbox_head[-1].num_classes)
if self.with_mask:
if det_bboxes.shape[0] == 0:
segm_result = [[]
for _ in range(self.mask_head[-1].num_classes)]
else:
aug_masks = []
aug_img_metas = []
for x, img_meta in zip(features, img_metas):
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
flip = img_meta[0]['flip']
flip_direction = img_meta[0]['flip_direction']
_bboxes = bbox_mapping(det_bboxes[:, :4], img_shape,
scale_factor, flip, flip_direction)
mask_rois = bbox2roi([_bboxes])
for i in range(self.num_stages):
mask_results = self._mask_forward(i, x, mask_rois)
aug_masks.append(
mask_results['mask_pred'].sigmoid().cpu().numpy())
aug_img_metas.append(img_meta)
merged_masks = merge_aug_masks(aug_masks, aug_img_metas,
self.test_cfg)
ori_shape = img_metas[0][0]['ori_shape']
dummy_scale_factor = np.ones(4)
segm_result = self.mask_head[-1].get_seg_masks(
merged_masks,
det_bboxes,
det_labels,
rcnn_test_cfg,
ori_shape,
scale_factor=dummy_scale_factor,
rescale=False)
return [(bbox_result, segm_result)]
else:
return [bbox_result]
def onnx_export(self, x, proposals, img_metas):
assert self.with_bbox, 'Bbox head must be implemented.'
assert proposals.shape[0] == 1, 'Only support one input image ' \
'while in exporting to ONNX'
# remove the scores
rois = proposals[..., :-1]
batch_size = rois.shape[0]
num_proposals_per_img = rois.shape[1]
# Eliminate the batch dimension
rois = rois.view(-1, 4)
# add dummy batch index
rois = torch.cat([rois.new_zeros(rois.shape[0], 1), rois], dim=-1)
max_shape = img_metas[0]['img_shape_for_onnx']
ms_scores = []
rcnn_test_cfg = self.test_cfg
for i in range(self.num_stages):
bbox_results = self._bbox_forward(i, x, rois)
cls_score = bbox_results['cls_score']
bbox_pred = bbox_results['bbox_pred']
# Recover the batch dimension
rois = rois.reshape(batch_size, num_proposals_per_img,
rois.size(-1))
cls_score = cls_score.reshape(batch_size, num_proposals_per_img,
cls_score.size(-1))
bbox_pred = bbox_pred.reshape(batch_size, num_proposals_per_img, 4)
ms_scores.append(cls_score)
if i < self.num_stages - 1:
assert self.bbox_head[i].reg_class_agnostic
new_rois = self.bbox_head[i].bbox_coder.decode(
rois[..., 1:], bbox_pred, max_shape=max_shape)
rois = new_rois.reshape(-1, new_rois.shape[-1])
# add dummy batch index
rois = torch.cat([rois.new_zeros(rois.shape[0], 1), rois],
dim=-1)
cls_score = sum(ms_scores) / float(len(ms_scores))
bbox_pred = bbox_pred.reshape(batch_size, num_proposals_per_img, 4)
rois = rois.reshape(batch_size, num_proposals_per_img, -1)
det_bboxes, det_labels = self.bbox_head[-1].onnx_export(
rois, cls_score, bbox_pred, max_shape, cfg=rcnn_test_cfg)
if not self.with_mask:
return det_bboxes, det_labels
else:
batch_index = torch.arange(
det_bboxes.size(0),
device=det_bboxes.device).float().view(-1, 1, 1).expand(
det_bboxes.size(0), det_bboxes.size(1), 1)
rois = det_bboxes[..., :4]
mask_rois = torch.cat([batch_index, rois], dim=-1)
mask_rois = mask_rois.view(-1, 5)
aug_masks = []
for i in range(self.num_stages):
mask_results = self._mask_forward(i, x, mask_rois)
mask_pred = mask_results['mask_pred']
aug_masks.append(mask_pred)
max_shape = img_metas[0]['img_shape_for_onnx']
# calculate the mean of masks from several stage
mask_pred = sum(aug_masks) / len(aug_masks)
segm_results = self.mask_head[-1].onnx_export(
mask_pred, rois.reshape(-1, 4), det_labels.reshape(-1),
self.test_cfg, max_shape)
segm_results = segm_results.reshape(batch_size,
det_bboxes.shape[1],
max_shape[0], max_shape[1])
return det_bboxes, det_labels, segm_results
第二部分:BaseRoIHead
# Copyright (c) OpenMMLab. All rights reserved.
from abc import ABCMeta, abstractmethod
from mmcv.runner import BaseModule
from ..builder import build_shared_head
class BaseRoIHead(BaseModule, metaclass=ABCMeta):
"""Base class for RoIHeads."""
def __init__(self,
bbox_roi_extractor=None,
bbox_head=None,
mask_roi_extractor=None,
mask_head=None,
shared_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None,
init_cfg=None):
super(BaseRoIHead, self).__init__(init_cfg)
self.train_cfg = train_cfg
self.test_cfg = test_cfg
if shared_head is not None:
shared_head.pretrained = pretrained
self.shared_head = build_shared_head(shared_head)
if bbox_head is not None:
self.init_bbox_head(bbox_roi_extractor, bbox_head)
if mask_head is not None:
self.init_mask_head(mask_roi_extractor, mask_head)
self.init_assigner_sampler()
@property
def with_bbox(self):
"""bool: whether the RoI head contains a `bbox_head`"""
return hasattr(self, 'bbox_head') and self.bbox_head is not None
@property
def with_mask(self):
"""bool: whether the RoI head contains a `mask_head`"""
return hasattr(self, 'mask_head') and self.mask_head is not None
@property
def with_shared_head(self):
"""bool: whether the RoI head contains a `shared_head`"""
return hasattr(self, 'shared_head') and self.shared_head is not None
@abstractmethod
def init_bbox_head(self):
"""Initialize ``bbox_head``"""
pass
@abstractmethod
def init_mask_head(self):
"""Initialize ``mask_head``"""
pass
@abstractmethod
def init_assigner_sampler(self):
"""Initialize assigner and sampler."""
pass
@abstractmethod
def forward_train(self,
x,
img_meta,
proposal_list,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
**kwargs):
"""Forward function during training."""
async def async_simple_test(self,
x,
proposal_list,
img_metas,
proposals=None,
rescale=False,
**kwargs):
"""Asynchronized test function."""
raise NotImplementedError
def simple_test(self,
x,
proposal_list,
img_meta,
proposals=None,
rescale=False,
**kwargs):
"""Test without augmentation."""
def aug_test(self, x, proposal_list, img_metas, rescale=False, **kwargs):
"""Test with augmentations.
If rescale is False, then returned bboxes and masks will fit the scale
of imgs[0].
"""
8、可视化
mmdet/core/visualization/image.py
关注这2个函数:draw_masks、imshow_det_bboxes
# Copyright (c) OpenMMLab. All rights reserved.
import sys
import cv2
import matplotlib.pyplot as plt
import mmcv
import numpy as np
import pycocotools.mask as mask_util
from matplotlib.collections import PatchCollection
from matplotlib.patches import Polygon
from mmdet.core.evaluation.panoptic_utils import INSTANCE_OFFSET
from ..mask.structures import bitmap_to_polygon
from ..utils import mask2ndarray
from .palette import get_palette, palette_val
__all__ = [
'color_val_matplotlib', 'draw_masks', 'draw_bboxes', 'draw_labels',
'imshow_det_bboxes', 'imshow_gt_det_bboxes'
]
EPS = 1e-2
def color_val_matplotlib(color):
"""Convert various input in BGR order to normalized RGB matplotlib color
tuples.
Args:
color (:obj`Color` | str | tuple | int | ndarray): Color inputs.
Returns:
tuple[float]: A tuple of 3 normalized floats indicating RGB channels.
"""
color = mmcv.color_val(color)
color = [color / 255 for color in color[::-1]]
return tuple(color)
def _get_adaptive_scales(areas, min_area=800, max_area=30000):
"""Get adaptive scales according to areas.
The scale range is [0.5, 1.0]. When the area is less than
``'min_area'``, the scale is 0.5 while the area is larger than
``'max_area'``, the scale is 1.0.
Args:
areas (ndarray): The areas of bboxes or masks with the
shape of (n, ).
min_area (int): Lower bound areas for adaptive scales.
Default: 800.
max_area (int): Upper bound areas for adaptive scales.
Default: 30000.
Returns:
ndarray: The adaotive scales with the shape of (n, ).
"""
scales = 0.5 + (areas - min_area) / (max_area - min_area)
scales = np.clip(scales, 0.5, 1.0)
return scales
def _get_bias_color(base, max_dist=30):
"""Get different colors for each masks.
Get different colors for each masks by adding a bias
color to the base category color.
Args:
base (ndarray): The base category color with the shape
of (3, ).
max_dist (int): The max distance of bias. Default: 30.
Returns:
ndarray: The new color for a mask with the shape of (3, ).
"""
new_color = base + np.random.randint(
low=-max_dist, high=max_dist + 1, size=3)
return np.clip(new_color, 0, 255, new_color)
def draw_bboxes(ax, bboxes, color='g', alpha=0.8, thickness=2):
"""Draw bounding boxes on the axes.
Args:
ax (matplotlib.Axes): The input axes.
bboxes (ndarray): The input bounding boxes with the shape
of (n, 4).
color (list[tuple] | matplotlib.color): the colors for each
bounding boxes.
alpha (float): Transparency of bounding boxes. Default: 0.8.
thickness (int): Thickness of lines. Default: 2.
Returns:
matplotlib.Axes: The result axes.
"""
polygons = []
for i, bbox in enumerate(bboxes):
bbox_int = bbox.astype(np.int32)
poly = [[bbox_int[0], bbox_int[1]], [bbox_int[0], bbox_int[3]],
[bbox_int[2], bbox_int[3]], [bbox_int[2], bbox_int[1]]]
np_poly = np.array(poly).reshape((4, 2))
polygons.append(Polygon(np_poly))
p = PatchCollection(
polygons,
facecolor='none',
edgecolors=color,
linewidths=thickness,
alpha=alpha)
ax.add_collection(p)
return ax
def draw_labels(ax,
labels,
positions,
scores=None,
class_names=None,
color='w',
font_size=8,
scales=None,
horizontal_alignment='left'):
"""Draw labels on the axes.
Args:
ax (matplotlib.Axes): The input axes.
labels (ndarray): The labels with the shape of (n, ).
positions (ndarray): The positions to draw each labels.
scores (ndarray): The scores for each labels.
class_names (list[str]): The class names.
color (list[tuple] | matplotlib.color): The colors for labels.
font_size (int): Font size of texts. Default: 8.
scales (list[float]): Scales of texts. Default: None.
horizontal_alignment (str): The horizontal alignment method of
texts. Default: 'left'.
Returns:
matplotlib.Axes: The result axes.
"""
for i, (pos, label) in enumerate(zip(positions, labels)):
label_text = class_names[
label] if class_names is not None else f'class {label}'
if scores is not None:
label_text += f'|{scores[i]:.02f}'
text_color = color[i] if isinstance(color, list) else color
font_size_mask = font_size if scales is None else font_size * scales[i]
ax.text(
pos[0],
pos[1],
f'{label_text}',
bbox={
'facecolor': 'black',
'alpha': 0.8,
'pad': 0.7,
'edgecolor': 'none'
},
color=text_color,
fontsize=font_size_mask,
verticalalignment='top',
horizontalalignment=horizontal_alignment)
return ax
def draw_masks(ax, img, masks, color=None, with_edge=True, alpha=0.8):
"""Draw masks on the image and their edges on the axes.
Args:
ax (matplotlib.Axes): The input axes.
img (ndarray): The image with the shape of (3, h, w).
masks (ndarray): The masks with the shape of (n, h, w).
color (ndarray): The colors for each masks with the shape
of (n, 3).
with_edge (bool): Whether to draw edges. Default: True.
alpha (float): Transparency of bounding boxes. Default: 0.8.
Returns:
matplotlib.Axes: The result axes.
ndarray: The result image.
"""
taken_colors = set([0, 0, 0])
if color is None:
random_colors = np.random.randint(0, 255, (masks.size(0), 3))
color = [tuple(c) for c in random_colors]
color = np.array(color, dtype=np.uint8)
polygons = []
for i, mask in enumerate(masks):
if with_edge:
contours, _ = bitmap_to_polygon(mask) # 将mask转为多个点
polygons += [Polygon(c) for c in contours]
color_mask = color[i]
while tuple(color_mask) in taken_colors:
color_mask = _get_bias_color(color_mask)
taken_colors.add(tuple(color_mask))
mask = mask.astype(bool)
img[mask] = img[mask] * (1 - alpha) + color_mask * alpha # 上色
p = PatchCollection(
polygons, facecolor='none', edgecolors='w', linewidths=1, alpha=0.8)
ax.add_collection(p)
return ax, img
def imshow_det_bboxes(img,
bboxes=None,
labels=None,
segms=None,
class_names=None,
score_thr=0,
bbox_color='green',
text_color='green',
mask_color=None,
thickness=2,
font_size=8,
win_name='',
show=True,
wait_time=0,
out_file=None):
print('bboxes=',bboxes) # N*5的坐标
print('labels=',labels) # N个0的坐标 [0 0 0 0 0 0 0]
print('segms=',segms.shape, segms) # segms= (7, 1920, 1080) 的True、False
print('class_names=',class_names) # ('t',)
print('score_thr=',score_thr) # 0.3
print('bbox_color=',bbox_color) # PALETTE
print('text_color=',text_color) # PALETTE
print('mask_color=',mask_color) # PALETTE, [(220, 20, 60), (119, 11, 32), (0, 0, 142)...
print('thickness=',thickness) # 2
print('font_size=',font_size) # 13
print('win_name=',win_name) # 空
print('show=',show) # False
print('wait_time=',wait_time) # 0
print('out_file=',out_file) # work_dirs/result/1564891439058dc2707b797.jpg
"""Draw bboxes and class labels (with scores) on an image.
Args:
img (str | ndarray): The image to be displayed.
bboxes (ndarray): Bounding boxes (with scores), shaped (n, 4) or
(n, 5).
labels (ndarray): Labels of bboxes.
segms (ndarray | None): Masks, shaped (n,h,w) or None.
class_names (list[str]): Names of each classes.
score_thr (float): Minimum score of bboxes to be shown. Default: 0.
bbox_color (list[tuple] | tuple | str | None): Colors of bbox lines.
If a single color is given, it will be applied to all classes.
The tuple of color should be in RGB order. Default: 'green'.
text_color (list[tuple] | tuple | str | None): Colors of texts.
If a single color is given, it will be applied to all classes.
The tuple of color should be in RGB order. Default: 'green'.
mask_color (list[tuple] | tuple | str | None, optional): Colors of
masks. If a single color is given, it will be applied to all
classes. The tuple of color should be in RGB order.
Default: None.
thickness (int): Thickness of lines. Default: 2.
font_size (int): Font size of texts. Default: 13.
show (bool): Whether to show the image. Default: True.
win_name (str): The window name. Default: ''.
wait_time (float): Value of waitKey param. Default: 0.
out_file (str, optional): The filename to write the image.
Default: None.
Returns:
ndarray: The image with bboxes drawn on it.
"""
assert bboxes is None or bboxes.ndim == 2, \
f' bboxes ndim should be 2, but its ndim is {bboxes.ndim}.'
assert labels.ndim == 1, \
f' labels ndim should be 1, but its ndim is {labels.ndim}.'
assert bboxes is None or bboxes.shape[1] == 4 or bboxes.shape[1] == 5, \
f' bboxes.shape[1] should be 4 or 5, but its {bboxes.shape[1]}.'
assert bboxes is None or bboxes.shape[0] <= labels.shape[0], \
'labels.shape[0] should not be less than bboxes.shape[0].'
assert segms is None or segms.shape[0] == labels.shape[0], \
'segms.shape[0] and labels.shape[0] should have the same length.'
assert segms is not None or bboxes is not None, \
'segms and bboxes should not be None at the same time.'
img = mmcv.imread(img).astype(np.uint8)
# 过滤低于阈值的bboxes和segms
if score_thr > 0:
assert bboxes is not None and bboxes.shape[1] == 5
scores = bboxes[:, -1]
inds = scores > score_thr
bboxes = bboxes[inds, :]
labels = labels[inds]
if segms is not None:
segms = segms[inds, ...]
img = mmcv.bgr2rgb(img)
width, height = img.shape[1], img.shape[0]
img = np.ascontiguousarray(img) # 使内存连续
fig = plt.figure(win_name, frameon=False)
plt.title(win_name)
canvas = fig.canvas
dpi = fig.get_dpi()
# add a small EPS to avoid precision lost due to matplotlib's truncation
# (https://github.com/matplotlib/matplotlib/issues/15363)
fig.set_size_inches((width + EPS) / dpi, (height + EPS) / dpi)
# remove white edges by set subplot margin
plt.subplots_adjust(left=0, right=1, bottom=0, top=1)
ax = plt.gca()
ax.axis('off')
max_label = int(max(labels) if len(labels) > 0 else 0)
text_palette = palette_val(get_palette(text_color, max_label + 1)) # 调色
text_colors = [text_palette[label] for label in labels]
num_bboxes = 0
if bboxes is not None:
# 获取颜色
num_bboxes = bboxes.shape[0]
bbox_palette = palette_val(get_palette(bbox_color, max_label + 1))
colors = [bbox_palette[label] for label in labels[:num_bboxes]]
# 绘制draw
draw_bboxes(ax, bboxes, colors, alpha=0.8, thickness=thickness)
horizontal_alignment = 'left'
positions = bboxes[:, :2].astype(np.int32) + thickness
areas = (bboxes[:, 3] - bboxes[:, 1]) * (bboxes[:, 2] - bboxes[:, 0])
scales = _get_adaptive_scales(areas)
scores = bboxes[:, 4] if bboxes.shape[1] == 5 else None
# 绘制draw
draw_labels(
ax,
labels[:num_bboxes],
positions,
scores=scores,
class_names=class_names,
color=text_colors,
font_size=font_size,
scales=scales,
horizontal_alignment=horizontal_alignment)
if segms is not None:
# 获取颜色
mask_palette = get_palette(mask_color, max_label + 1)
colors = [mask_palette[label] for label in labels]
colors = np.array(colors, dtype=np.uint8)
# 绘制draw
draw_masks(ax, img, segms, colors, with_edge=True)
if num_bboxes < segms.shape[0]:
segms = segms[num_bboxes:]
horizontal_alignment = 'center'
areas = []
positions = []
for mask in segms:
_, _, stats, centroids = cv2.connectedComponentsWithStats(
mask.astype(np.uint8), connectivity=8)
largest_id = np.argmax(stats[1:, -1]) + 1
positions.append(centroids[largest_id])
areas.append(stats[largest_id, -1])
areas = np.stack(areas, axis=0)
scales = _get_adaptive_scales(areas)
# 绘制draw
draw_labels(
ax,
labels[num_bboxes:],
positions,
class_names=class_names,
color=text_colors,
font_size=font_size,
scales=scales,
horizontal_alignment=horizontal_alignment)
plt.imshow(img)
stream, _ = canvas.print_to_buffer()
buffer = np.frombuffer(stream, dtype='uint8')
if sys.platform == 'darwin':
width, height = canvas.get_width_height(physical=True)
img_rgba = buffer.reshape(height, width, 4)
rgb, alpha = np.split(img_rgba, [3], axis=2)
img = rgb.astype('uint8')
img = mmcv.rgb2bgr(img)
if show:
# We do not use cv2 for display because in some cases, opencv will
# conflict with Qt, it will output a warning: Current thread
# is not the object's thread. You can refer to
# https://github.com/opencv/opencv-python/issues/46 for details
if wait_time == 0:
plt.show()
else:
plt.show(block=False)
plt.pause(wait_time)
if out_file is not None:
mmcv.imwrite(img, out_file)
plt.close()
return img
def imshow_gt_det_bboxes(img,
annotation,
result,
class_names=None,
score_thr=0,
gt_bbox_color=(61, 102, 255),
gt_text_color=(200, 200, 200),
gt_mask_color=(61, 102, 255),
det_bbox_color=(241, 101, 72),
det_text_color=(200, 200, 200),
det_mask_color=(241, 101, 72),
thickness=2,
font_size=13,
win_name='',
show=True,
wait_time=0,
out_file=None,
overlay_gt_pred=True):
"""General visualization GT and result function.
Args:
img (str | ndarray): The image to be displayed.
annotation (dict): Ground truth annotations where contain keys of
'gt_bboxes' and 'gt_labels' or 'gt_masks'.
result (tuple[list] | list): The detection result, can be either
(bbox, segm) or just bbox.
class_names (list[str]): Names of each classes.
score_thr (float): Minimum score of bboxes to be shown. Default: 0.
gt_bbox_color (list[tuple] | tuple | str | None): Colors of bbox lines.
If a single color is given, it will be applied to all classes.
The tuple of color should be in RGB order. Default: (61, 102, 255).
gt_text_color (list[tuple] | tuple | str | None): Colors of texts.
If a single color is given, it will be applied to all classes.
The tuple of color should be in RGB order. Default: (200, 200, 200).
gt_mask_color (list[tuple] | tuple | str | None, optional): Colors of
masks. If a single color is given, it will be applied to all classes.
The tuple of color should be in RGB order. Default: (61, 102, 255).
det_bbox_color (list[tuple] | tuple | str | None):Colors of bbox lines.
If a single color is given, it will be applied to all classes.
The tuple of color should be in RGB order. Default: (241, 101, 72).
det_text_color (list[tuple] | tuple | str | None):Colors of texts.
If a single color is given, it will be applied to all classes.
The tuple of color should be in RGB order. Default: (200, 200, 200).
det_mask_color (list[tuple] | tuple | str | None, optional): Color of
masks. If a single color is given, it will be applied to all classes.
The tuple of color should be in RGB order. Default: (241, 101, 72).
thickness (int): Thickness of lines. Default: 2.
font_size (int): Font size of texts. Default: 13.
win_name (str): The window name. Default: ''.
show (bool): Whether to show the image. Default: True.
wait_time (float): Value of waitKey param. Default: 0.
out_file (str, optional): The filename to write the image.
Default: None.
overlay_gt_pred (bool): Whether to plot gts and predictions on the
same image. If False, predictions and gts will be plotted on two same
image which will be concatenated in vertical direction. The image
above is drawn with gt, and the image below is drawn with the
prediction result. Default: True.
Returns:
ndarray: The image with bboxes or masks drawn on it.
"""
assert 'gt_bboxes' in annotation
assert 'gt_labels' in annotation
assert isinstance(result, (tuple, list, dict)), 'Expected ' \
f'tuple or list or dict, but get {type(result)}'
gt_bboxes = annotation['gt_bboxes']
gt_labels = annotation['gt_labels']
gt_masks = annotation.get('gt_masks', None)
if gt_masks is not None:
gt_masks = mask2ndarray(gt_masks)
gt_seg = annotation.get('gt_semantic_seg', None)
if gt_seg is not None:
pad_value = 255 # the padding value of gt_seg
sem_labels = np.unique(gt_seg)
all_labels = np.concatenate((gt_labels, sem_labels), axis=0)
all_labels, counts = np.unique(all_labels, return_counts=True)
stuff_labels = all_labels[np.logical_and(counts < 2,
all_labels != pad_value)]
stuff_masks = gt_seg[None] == stuff_labels[:, None, None]
gt_labels = np.concatenate((gt_labels, stuff_labels), axis=0)
gt_masks = np.concatenate((gt_masks, stuff_masks.astype(np.uint8)),
axis=0)
# If you need to show the bounding boxes,
# please comment the following line
# gt_bboxes = None
img = mmcv.imread(img)
img_with_gt = imshow_det_bboxes(
img,
gt_bboxes,
gt_labels,
gt_masks,
class_names=class_names,
bbox_color=gt_bbox_color,
text_color=gt_text_color,
mask_color=gt_mask_color,
thickness=thickness,
font_size=font_size,
win_name=win_name,
show=False)
if not isinstance(result, dict):
if isinstance(result, tuple):
bbox_result, segm_result = result
if isinstance(segm_result, tuple):
segm_result = segm_result[0] # ms rcnn
else:
bbox_result, segm_result = result, None
bboxes = np.vstack(bbox_result)
labels = [
np.full(bbox.shape[0], i, dtype=np.int32)
for i, bbox in enumerate(bbox_result)
]
labels = np.concatenate(labels)
segms = None
if segm_result is not None and len(labels) > 0: # non empty
segms = mmcv.concat_list(segm_result)
segms = mask_util.decode(segms)
segms = segms.transpose(2, 0, 1)
else:
assert class_names is not None, 'We need to know the number ' \
'of classes.'
VOID = len(class_names)
bboxes = None
pan_results = result['pan_results']
# keep objects ahead
ids = np.unique(pan_results)[::-1]
legal_indices = ids != VOID
ids = ids[legal_indices]
labels = np.array([id % INSTANCE_OFFSET for id in ids], dtype=np.int64)
segms = (pan_results[None] == ids[:, None, None])
if overlay_gt_pred:
img = imshow_det_bboxes(
img_with_gt,
bboxes,
labels,
segms=segms,
class_names=class_names,
score_thr=score_thr,
bbox_color=det_bbox_color,
text_color=det_text_color,
mask_color=det_mask_color,
thickness=thickness,
font_size=font_size,
win_name=win_name,
show=show,
wait_time=wait_time,
out_file=out_file)
else:
img_with_det = imshow_det_bboxes(
img,
bboxes,
labels,
segms=segms,
class_names=class_names,
score_thr=score_thr,
bbox_color=det_bbox_color,
text_color=det_text_color,
mask_color=det_mask_color,
thickness=thickness,
font_size=font_size,
win_name=win_name,
show=False)
img = np.concatenate([img_with_gt, img_with_det], axis=0)
plt.imshow(img)
if show:
if wait_time == 0:
plt.show()
else:
plt.show(block=False)
plt.pause(wait_time)
if out_file is not None:
mmcv.imwrite(img, out_file)
plt.close()
return img
9、数据集参数
coco_instance.py 如下
# dataset settings
dataset_type = 'CocoDataset'
data_root = 'data/coco/'
img_norm_cfg = dict(
mean=[123.675, 116.28, 103.53], std=[58.395, 57.12, 57.375], to_rgb=True) # 输入图像初始化,减去均值mean并处以方差std,to_rgb表示将bgr转为rgb
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(type='LoadAnnotations', with_bbox=True, with_mask=True),
# size的说明看https://zhuanlan.zhihu.com/p/141176981
# max_long_edge = max(img_scale)
# max_short_edge = min(img_scale)
# 取值方式: 大值/长边 小值/短边 谁的比值小 按谁来计算缩放比例
# scale_factor = min(max_long_edge / max(h, w), max_short_edge / min(h, w))
dict(type='Resize', img_scale=(1333, 800), keep_ratio=True), # 输入图像尺寸,最大边不超过1333,最小边不小于800
dict(type='RandomFlip', flip_ratio=0.5),
dict(type='Normalize', **img_norm_cfg),
dict(type='Pad', size_divisor=32), # Pad成size_divisor=32的倍数,避免卷积时,特征损失。
dict(type='DefaultFormatBundle'),
dict(type='Collect', keys=['img', 'gt_bboxes', 'gt_labels', 'gt_masks']),
]
test_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='MultiScaleFlipAug',
img_scale=(1333, 800),
flip=False,
transforms=[
dict(type='Resize', keep_ratio=True),
dict(type='RandomFlip'),
dict(type='Normalize', **img_norm_cfg),
dict(type='Pad', size_divisor=32),
dict(type='ImageToTensor', keys=['img']),
dict(type='Collect', keys=['img']),
])
]
data = dict(
samples_per_gpu=2,
workers_per_gpu=2, # 每个gpu用于数据加载的子进程数
train=dict(
type=dataset_type,
ann_file=data_root + 'annotations/instances_train2017.json',
img_prefix=data_root + 'train2017/',
pipeline=train_pipeline),
val=dict(
type=dataset_type,
ann_file=data_root + 'annotations/instances_val2017.json',
img_prefix=data_root + 'val2017/',
pipeline=test_pipeline),
test=dict(
type=dataset_type,
ann_file=data_root + 'annotations/instances_train2017.json',
img_prefix=data_root + 'val2017/',
pipeline=test_pipeline))
evaluation = dict(metric=['bbox', 'segm'])