DiffusionDet : DiffusionTracker复习回顾
之前的一段时间学习了基于扩散模型的检测于跟踪算法,最近在忙别的事情就导致了这里存在了很多和细节上的遗忘在这里进行一定的回顾,之后在试图看看可以进一步学习基于点集的扩散过程吗?
DiffusionDet代码回顾
-
这是一个新的框架,它将对象检测表述为从噪声框到对象框的去噪扩散过程
-
在训练阶段,对象框从真实框GT扩散到随机分布,并且模型学习逆转这种噪声过程。恢复到真实框通过损失函数来进行训练。
-
在推理阶段,模型以渐进的方式将一组随机生成的框细化为输出结果。也就是对一张随机加噪通过模型去噪来生成最终的预测框。
这里先通过走一遍代码的断点调试流程,来回顾一些整个DiffusionDet之前学过的知识。
我们忽略加载参数的部分直接从模型加载的部分看起。
模型构建部分
demo = VisualizationDemo(cfg) # 构建检测网络
依次执行这些函数构建网络模型(打好断点调试)
self.predictor = DefaultPredictor(cfg)
self.model = build_model(self.cfg)
model = META_ARCH_REGISTRY.get(meta_arch)(cfg)
比较关键的一个部分构建DiffusionDet的整体结构。
我们首先要通过断点调试回顾的是在定义结构的部分第一个部分主干网络的中间特征层定义了
- p2 p3 p4 p5这几个特征层
- num_proposal的数量为100也就是每次加噪的时候加100个噪声框
- 类别80 coco数据集 多头6头
# Build Backbone.
self.backbone = build_backbone(cfg)
backbone = BACKBONE_REGISTRY.get(backbone_name)(cfg, input_shape)
- 根据深度参数构造restnet50的底部主干网络部分。
bottom_up = build_resnet_backbone(cfg, input_shape)
- 构建除了FPN结构之外的主干网络部分(也就是其参数文件中配置的RestNet50的结构)
-
7x7步长为2的卷积下采样2倍。
-
定义一些网络模型的参数便于进行构造和存储。
-
ResNet-50 模型的块数量将是 [3, 4, 6, 3],这意味着:第一阶段有 3 个残差块第二阶段有 4 个残差块第三阶段有 6 个残差块第四阶段有 3 个残差块。
num_blocks_per_stage = {
18: [2, 2, 2, 2],
34: [3, 4, 6, 3],
50: [3, 4, 6, 3],
101: [3, 4, 23, 3],
152: [3, 8, 36, 3],
}[depth]
stage_kargs = { # 包含了构建每个阶段所需的参数
"num_blocks": num_blocks_per_stage[idx],
"stride_per_block": [first_stride] + [1] * (num_blocks_per_stage[idx] - 1),
"in_channels": in_channels,
"out_channels": out_channels,
"norm": norm,
}
根据参数值构建多层的BottleneckBlock结构完成循环即可的典型结构包括三个卷积层:
- 1x1 卷积层:首先使用一个 1x1 卷积层来减少输入的通道数。这一步的目的是降低计算量,因为 1x1 卷积的参数数量远少于 3x3 卷积。
- 3x3 卷积层:接着是一个 3x3 卷积层,用于提取空间特征。这是瓶颈块的核心,用于提取图像中的细节信息。
- 1x1 卷积层:最后,使用另一个 1x1 卷积层将通道数恢复到原始大小,以便于与输入进行相加。这一层通常被称为扩展层,因为它将特征图的通道数增加到一个更大的值,通常是原始通道数的四倍
构造完的结构如下所示,完成特征提取网络的返回
bottom_up = build_resnet_backbone(cfg, input_shape)
连接一个FPN的结构完成backbone的构造。
backbone = FPN(
bottom_up=bottom_up, # 传入restnet50的结构
in_features=in_features, # 特征层名称
out_channels=out_channels, # 中间的输出通道
norm=cfg.MODEL.FPN.NORM,
top_block=LastLevelMaxPool(), # 一个最大池化层
fuse_type=cfg.MODEL.FPN.FUSE_TYPE,
)
根据这个图走一遍断点调试过程中的这些模块部分即可。、
- 获取左侧下采样部分的结构。即下采样 4倍 8倍 16倍和32倍的特征层输出。
input_shapes = bottom_up.output_shape() # 获取下采样的结构
- 循环构造两个部分的模型,第一个是中间的1x1的卷积的部分,用来调整整个通道数为256 第二个是最后的经过上采样后要经过的3x3的卷积结构。
lateral_conv = Conv2d(
in_channels, out_channels, kernel_size=1, bias=use_bias, norm=lateral_norm
)
output_conv = Conv2d(
out_channels,
out_channels,
kernel_size=3,
stride=1,
padding=1,
bias=use_bias,
norm=output_norm,
)
循环分别加入到两个变量中来进行存储。
self.add_module(“fpn_lateral{}”.format(stage), lateral_conv)
self.add_module(“fpn_output{}”.format(stage), output_conv)
- 加入之前的bottom_up结构部分以及最大池化的部分构建完成论文中的Image Encode部分。
之后构建DiffusionDet模型的主要操作分为了两个部分,一个是加载和配置一些加噪去噪过程中的参数。第二是结合以前的一个项目构建出Decode的结构部分。
class DiffusionDet(nn.Module):
"""
Implement DiffusionDet
"""
def __init__(self, cfg):
super().__init__()
self.device = torch.device(cfg.MODEL.DEVICE)
self.in_features = cfg.MODEL.ROI_HEADS.IN_FEATURES
self.num_classes = cfg.MODEL.DiffusionDet.NUM_CLASSES
self.num_proposals = cfg.MODEL.DiffusionDet.NUM_PROPOSALS
self.hidden_dim = cfg.MODEL.DiffusionDet.HIDDEN_DIM
self.num_heads = cfg.MODEL.DiffusionDet.NUM_HEADS
# Build Backbone.
self.backbone = build_backbone(cfg)
self.size_divisibility = self.backbone.size_divisibility
# build diffusion(构建扩散模型的相关参数)
timesteps = 1000
sampling_timesteps = cfg.MODEL.DiffusionDet.SAMPLE_STEP # 模型的采样过程中需要执行的步骤数
self.objective = 'pred_x0'
betas = cosine_beta_schedule(timesteps) # 余弦退火调整学习率
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, dim=0) # 函数计算 alphas 的累积乘积,得到 alphas_cumprod
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.) # 它在序列的开头添加一个值为1的元素 确保在计算逆扩散过程中的累积乘积时保持正确的比例
timesteps, = betas.shape
self.num_timesteps = int(timesteps)
self.sampling_timesteps = default(sampling_timesteps, timesteps)
assert self.sampling_timesteps <= timesteps
self.is_ddim_sampling = self.sampling_timesteps < timesteps # 是否使用 (DDIM) 采样方法 配置的采样时间步 sampling_timesteps 小于模型的总时间步 timesteps,则使用 DDIM 采样
self.ddim_sampling_eta = 1. # 这行代码设置了 DDIM 采样中的 η 参数,η 是一个超参数,用于控制采样过程中的噪声水平。在这里,η 被设置为 1.0,这意味着在采样过程中不进行噪声调整
self.self_condition = False
self.scale = cfg.MODEL.DiffusionDet.SNR_SCALE # 取信噪比(SNR)的缩放比例
self.box_renewal = True #是否在模型的推理过程中进行框更新
self.use_ensemble = True
self.register_buffer('betas', betas)
self.register_buffer('alphas_cumprod', alphas_cumprod)
self.register_buffer('alphas_cumprod_prev', alphas_cumprod_prev)
# calculations for diffusion q(x_t | x_{t-1}) and others
self.register_buffer('sqrt_alphas_cumprod', torch.sqrt(alphas_cumprod))
self.register_buffer('sqrt_one_minus_alphas_cumprod', torch.sqrt(1. - alphas_cumprod))
self.register_buffer('log_one_minus_alphas_cumprod', torch.log(1. - alphas_cumprod))
self.register_buffer('sqrt_recip_alphas_cumprod', torch.sqrt(1. / alphas_cumprod))
self.register_buffer('sqrt_recipm1_alphas_cumprod', torch.sqrt(1. / alphas_cumprod - 1))
# calculations for posterior q(x_{t-1} | x_t, x_0)
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)
# above: equal to 1. / (1. / (1. - alpha_cumprod_tm1) + alpha_t / beta_t)
self.register_buffer('posterior_variance', posterior_variance)
# below: log calculation clipped because the posterior variance is 0 at the beginning of the diffusion chain
self.register_buffer('posterior_log_variance_clipped', torch.log(posterior_variance.clamp(min=1e-20)))
self.register_buffer('posterior_mean_coef1', betas * torch.sqrt(alphas_cumprod_prev) / (1. - alphas_cumprod))
self.register_buffer('posterior_mean_coef2',
(1. - alphas_cumprod_prev) * torch.sqrt(alphas) / (1. - alphas_cumprod))
# Build Dynamic Head.
self.head = DynamicHead(cfg=cfg, roi_input_shape=self.backbone.output_shape())
# Loss parameters:
class_weight = cfg.MODEL.DiffusionDet.CLASS_WEIGHT
giou_weight = cfg.MODEL.DiffusionDet.GIOU_WEIGHT
l1_weight = cfg.MODEL.DiffusionDet.L1_WEIGHT
no_object_weight = cfg.MODEL.DiffusionDet.NO_OBJECT_WEIGHT
self.deep_supervision = cfg.MODEL.DiffusionDet.DEEP_SUPERVISION
self.use_focal = cfg.MODEL.DiffusionDet.USE_FOCAL
self.use_fed_loss = cfg.MODEL.DiffusionDet.USE_FED_LOSS
self.use_nms = cfg.MODEL.DiffusionDet.USE_NMS
# Build Criterion. 将预测的边界框与真实边界框进行匹配。匈牙利匹配器是一种解决分配问题的方法,它能够找到成本最小的匹配方式
matcher = HungarianMatcherDynamicK(
cfg=cfg, cost_class=class_weight, cost_bbox=l1_weight, cost_giou=giou_weight, use_focal=self.use_focal
) # cost_bbox=l1_weightL1权重,用于指定边界框坐标预测的成本
weight_dict = {"loss_ce": class_weight, "loss_bbox": l1_weight, "loss_giou": giou_weight}
if self.deep_supervision:
aux_weight_dict = {}
for i in range(self.num_heads - 1):
aux_weight_dict.update({k + f"_{i}": v for k, v in weight_dict.items()})
weight_dict.update(aux_weight_dict)
losses = ["labels", "boxes"]
self.criterion = SetCriterionDynamicK(
cfg=cfg, num_classes=self.num_classes, matcher=matcher, weight_dict=weight_dict, eos_coef=no_object_weight,
losses=losses, use_focal=self.use_focal,) # 构造损失函数
pixel_mean = torch.Tensor(cfg.MODEL.PIXEL_MEAN).to(self.device).view(3, 1, 1)
pixel_std = torch.Tensor(cfg.MODEL.PIXEL_STD).to(self.device).view(3, 1, 1)
self.normalizer = lambda x: (x - pixel_mean) / pixel_std
self.to(self.device)
构造decode的部分或者说是去噪头的部分
# Build Dynamic Head.
self.head = DynamicHead(cfg=cfg, roi_input_shape=self.backbone.output_shape())
- 通过构建一个ROI pooling层通过输出的特征图在上面进行区域兴趣卷积得到。固定大小为7x7的特征区域之后应该是选择100个作为候选的噪声框进行去噪的过程。
# Build RoI.
box_pooler = self._init_box_pooler(cfg, roi_input_shape)
self.box_pooler = box_pooler
加载个配置一些参数。
构造RCNN结构选择候选区域并进行处理
rcnn_head = RCNNHead(cfg, d_model, num_classes, dim_feedforward, nhead, dropout, activation)
检测解码器在一个检测头中有 6 个阶段,遵循 DETR 和 Sparse R-CNN。 此外,DiffusionDet
# dynamic.
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) # 8头的注意力机制模块
self.inst_interact = DynamicConv(cfg) # 动态卷积层
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
self.activation = _get_activation_fn(activation)
# block time mlp
self.block_time_mlp = nn.Sequential(nn.SiLU(), nn.Linear(d_model * 4, d_model * 2))
断点调试的一个动态结果。
最后的一个部分就加上提取分类信息和回归信息的结构,在加上两个分类头和回归头就完成了目标检测模型的构建。
- 80个类别
- 4个的坐标位置
# cls.
num_cls = cfg.MODEL.DiffusionDet.NUM_CLS
cls_module = list()
for _ in range(num_cls):
cls_module.append(nn.Linear(d_model, d_model, False))
cls_module.append(nn.LayerNorm(d_model))
cls_module.append(nn.ReLU(inplace=True))
self.cls_module = nn.ModuleList(cls_module)
# reg.
num_reg = cfg.MODEL.DiffusionDet.NUM_REG
reg_module = list()
for _ in range(num_reg):
reg_module.append(nn.Linear(d_model, d_model, False))
reg_module.append(nn.LayerNorm(d_model))
reg_module.append(nn.ReLU(inplace=True))
self.reg_module = nn.ModuleList(reg_module)
整个模型的构建结构。
前向传播进行检测部分
predictions, visualized_output = demo.run_on_image(img)
predictions = self.predictor(image) #
对图像进行预处理的部分。
执行前向传播的一个最关键的函数。
predictions = self.model([inputs])[0] # 获取模型输出的第一个元素
- backbone的上部分提取出5个特征图,4个基础上在下采样得到其中的一个。
# Feature Extraction.特征提取的部分
src = self.backbone(images.tensor) # FPN最终提取的5个不同尺度的特征图信息
results.extend(self.top_block(top_block_in_feature)) # 在进行一次下采样得到p6
但特征里面只会保留4个也就是RestNet50提取的4个部分。
- 因为我们执行的是前向的推理过程,所以需要的是ddim这一个函数最终返回检测的结果。
results = self.ddim_sample(batched_inputs, features, images_whwh, images) #DDIM采样,这是一种逐步从噪声中恢复出清晰图像的技术 ddim_sample(关键)
我们对这个部分进行细化就可以执行完DiffusionDet的所有关键的过程流程了。
- 分离好时间节点(源码中这一个部分它的配置项为1也就是一次的去噪就可以完成不用和ddpm一样多次的去噪。)
# [-1, 0, 1, 2, ..., T-1] when sampling_timesteps == total_timesteps objective:目标函数 生成时间序列通常用于控制去噪扩散隐马尔可夫模型(DDIM)
times = torch.linspace(-1, total_timesteps - 1, steps=sampling_timesteps + 1) # sampling_timesteps + 1个时间点
times = list(reversed(times.int().tolist())) # 时间为倒序 从T到0
time_pairs = list(zip(times[:-1], times[1:])) # [(T-1, T-2), (T-2, T-3), ..., (1, 0), (0, -1)]
- 根据候选框的数为100从而生成100个噪声框。定义用来存储结果的变量。进行循环进行去噪恢复出目标框的处理
img = torch.randn(shape, device=self.device) # 产生标准高斯分布bboxs 用来加噪的边界框扰动
ensemble_score, ensemble_label, ensemble_coord = [], [], [] # 置信度分数 类别标签 坐标
for time, time_next in time_pairs: # 相邻时间两步计算
5. 将提取出来的四个特征层输入到于类似DETR带有注意力机制的的第二部分返回输出预测结果
outputs_class, outputs_coord = self.head(backbone_feats, x_boxes, t, None)
preds, outputs_class, outputs_coord = self.model_predictions(backbone_feats, images_whwh, img, time_cond,
self_cond, clip_x_start=clip_denoised) # 预测的噪声、x_0和类别与坐标
def model_predictions(self, backbone_feats, images_whwh, x, t, x_self_cond=None, clip_x_start=False):
x_boxes = torch.clamp(x, min=-1 * self.scale, max=self.scale)
x_boxes = ((x_boxes / self.scale) + 1) / 2 # 执行论文伪代码中的尺度缩放的部分 x是初始化的噪声值
x_boxes = box_cxcywh_to_xyxy(x_boxes)
x_boxes = x_boxes * images_whwh[:, None, :]
outputs_class, outputs_coord = self.head(backbone_feats, x_boxes, t, None) #得到预测的类别和标签 outputs_coord==pb_pred(也就是先通过decode得到的一个预测值)
# 将预测的边界框从绝对坐标转换为相对于图像尺寸的比例坐标,然后进行缩放和裁剪以确保坐标在有效的范围内
x_start = outputs_coord[-1] # (batch, num_proposals, 4) predict boxes: absolute coordinates (x1, y1, x2, y2) 输出的坐标中提取最后一个时间步的预测边界框
x_start = x_start / images_whwh[:, None, :] # 归一化
x_start = box_xyxy_to_cxcywh(x_start) # 通过计算边界框的中心点和宽度高度来实现的
x_start = (x_start * 2 - 1.) * self.scale
x_start = torch.clamp(x_start, min=-1 * self.scale, max=self.scale) # 裁剪坐标 函数确保所有坐标都在[-1 * self.scale, self.scale]的范围内。这可以防止坐标超出有效范围
pred_noise = self.predict_noise_from_start(x, t, x_start) # 用于根据当前状态和时间步预测噪声(也就是通过模型预测噪声来进行去噪的过程) pred_noise用于存储预测的噪声值
return ModelPrediction(pred_noise, x_start), outputs_class, outputs_coord
6. 根据返回的预测结果,带入扩散模型训练得到的参数中,给出预测的噪声,之后通过迭代的去噪操作完成整个过程。
pred_noise = self.predict_noise_from_start(x, t, x_start) # 用于根据当前状态和时间步预测噪声(也就是通过模型预测噪声来进行去噪的过程) pred_noise用于存储预测的噪声值
return ModelPrediction(pred_noise, x_start), outputs_class, outputs_coord # 将预测的噪声和原来的噪声作为一个集合的形式进行返回
pred_noise, x_start = preds.pred_noise, preds.pred_x_start #还原回来
- 通过一个过滤操作对低置信度的得分框进行替换。同时结合DDIM的操得到最后的输出结果。
if self.box_renewal: # filter reneral机制 将置信度低的边界框用随机框替换
score_per_image, box_per_image = outputs_class[-1][0], outputs_coord[-1][0] # 提取批次中的第一个图像的输出
threshold = 0.5
score_per_image = torch.sigmoid(score_per_image) # 计算分数概率
value, _ = torch.max(score_per_image, -1, keepdim=False) # 最后一个维度(类别维度)上找到每个提议的最大分数,并返回最大分数的值和索引
keep_idx = value > threshold
num_remain = torch.sum(keep_idx) # 计算超过阈值的提议数量,即筛选后剩余的提议数量
pred_noise = pred_noise[:, keep_idx, :] # keep_idx来筛选pred_noise张量,仅保留那些满足阈值条件的预测噪声
x_start = x_start[:, keep_idx, :] # x_start张量,保留那些与筛选后的预测噪声相对应的初始边界框
img = img[:, keep_idx, :] # 这行代码进一步筛选img张量,保留那些与筛选后的预测噪声和初始边界框相对应的图像特征
if time_next < 0:
img = x_start
continue
alpha = self.alphas_cumprod[time] # 取当前时间步time的累积乘积alpha
alpha_next = self.alphas_cumprod[time_next] # 这行代码提取下一个时间步time_next的累积乘积alpha_next
sigma = eta * ((1 - alpha / alpha_next) * (1 - alpha_next) / (1 - alpha)).sqrt() # 这行代码计算噪声系数sigma。eta是一个超参数,用于控制噪声的强度。这个公式是基于DDIM模型的噪声调度。
c = (1 - alpha_next - sigma ** 2).sqrt() # 计算系数c,它与预测的噪声和当前时间步的图像有关
noise = torch.randn_like(img)
img = x_start * alpha_next.sqrt() + \
c * pred_noise + \
sigma * noise # 去噪后的图片 这行代码根据DDIM模型的公式更新图像。x_start是初始图像或预测的图像,alpha_next.sqrt()是下一个时间步的平方根累积乘积,c * pred_noise是预测的噪声,sigma * noise是随机噪声。这个公式结合了这些组件以生成当前时间步的图像。
之后就是逐步向前返回模型完成最后的检测结束即可。
DiffusionTracker代码回顾
DiffusionTracker的代码部分有很多的地方和DiffusionDet是一样的因此篇幅会小一些。自己的理解过程也会弱一些。