前言:
基于BEV的稠密融合算法或许并不是最优的多摄融合感知框架。同时特征级的多摄融合也并不等价于BEV。这两年,PETR系列(PETR, PETR-v2, StreamPETR) 也取得了卓越的性能,并且其输出空间是稀疏的。在PETR系列方法中,对于每个instance feature,采用global cross attention来实现多视角的特征融合。由于融合模块计算复杂度仍与特征图尺寸相关,因此其仍然属于稠密算法的范畴,对高分辨率的图像特征输入不够友好。
因此,我们希望实现一个高性能高效率的长时序纯稀疏融合感知算法,一方面能加速2D->3D 的转换效率,另外一方面在图像空间直接捕获目标跨摄像头的关联关系更加容易,因为在2D->BEV的环节不可避免存在大量信息丢失。这条技术路线代表性的方法是基于deformable attention 的DETR3D算法。然而从开源数据集指标来看,DETR3D的性能距离其他稠密类型的算法存在较大差距。为了Make 纯稀疏感知 Great Again,我们近期提出了Sparse4D及其进化版本Sparse4D v2,从Query构建方式、特征采样方式、特征融合方式、时序融合方式等多个方面提升了模型的效果。当前,Sparse4D V2 在nuScenes detection 3d排行榜来看,达到了SOTA的效果,超越了包括SOLOFusion、BEVFormer v2和StreamPETR在内的一众最新方法,并且在推理效率上也具备显著优势。本文主要介绍了Sparse4D 和 Sparse4D V2 方案的细节实践。
代码部分
v2 与v3 的代码是延续关系,v3 对v2 进行了优化。
我们看看v3 的算子操作顺序:
num_decoder = 6
num_single_frame_decoder = 1
operation_order=(
[
"gnn",
"norm",
"deformable",
"ffn",
"norm",
"refine",
]
* num_single_frame_decoder
+ [
"temp_gnn",
"gnn",
"norm",
"deformable",
"ffn",
"norm",
"refine",
]
* (num_decoder - num_single_frame_decoder)
)[2:]
这个操作顺序是
['deformable', 'ffn', 'norm', 'refine',
'temp_gnn', 'gnn', 'norm', 'deformable', 'ffn', 'norm', 'refine',
'temp_gnn', 'gnn', 'norm', 'deformable', 'ffn', 'norm', 'refine',
'temp_gnn', 'gnn', 'norm', 'deformable', 'ffn', 'norm', 'refine',
'temp_gnn', 'gnn', 'norm', 'deformable', 'ffn', 'norm', 'refine',
'temp_gnn', 'gnn', 'norm', 'deformable', 'ffn', 'norm', 'refine']
比较奇怪的是论文的结构中有交叉注意力的,但是代码中对应的应该是 temp_gnn 。我立即的交叉注意力是:与传统的自注意力(Self-Attention)不同,自注意力是在单个序列内部计算注意力权重,而交叉注意力是在两个不同序列之间计算注意力权重。
对应论文图片
我们看看transform 的结构。sparse4d 的里面编码器没有attention。deformable_aggregation 这个的解读在【Transformer-BEV编码(7)】有说明,以串联的方式将不同模态的特征组合起来。
我们看看BEVFormer,可以看出sparse4d 的结构也是不同的,不大会生成稠密的特征图。
文章目录
- 前言:
- 代码部分
- V2 核心
- 生成anchor
- 1 deformable_aggregation 和相机参数泛化能力差的问题
- 2 temp_graph_model 模块是 MultiheadAttention
- 2 graph_model模块是 MultiheadAttention
- 3 refine
- 4 Dense Depth Supervision
- 5 穿插在操作顺序中的小细节:时序处理bank 和 box编码
- get()
- update 方法
- cache方法
- get_instance_id方法
- SparseBox3DKeyPointsGenerator
- 学习关键点
- 投影函数
- box 编码
- V3 升级
- 去噪的想法
- 调用`self.sampler.update_dn`方法。
- cache_dn的方法
- sample() 是v2 一样的吧
- Quality Estimation
V2 核心
生成anchor
import numpy as np
# 替换'your_file.npy'为你的文件路径和名称
file_path = 'your_file.npy'
# 使用numpy的load函数来读取.npy文件
data = np.load("nuscenes_kmeans900.npy")
# 现在,'data'是一个NumPy数组,包含.npy文件中的数据
print(data)
data[0]
array([ 4.54069267, -18.25305388, -1.63889485, 1.0962088 ,
0.48790321, 0.51029372, 0. , 1. ,
0. , 0. , 0. ])
每个anchor是一个 11 维的数组。一个有900组anchor
1 deformable_aggregation 和相机参数泛化能力差的问题
deformable_aggregation 这个的解读在【Transformer-BEV编码(7)】有说明,以串联的方式将不同模态的特征组合起来。
将相机参数编码纳入可变形聚合。在 Sparse4Dv1 中,Deformable Aggregation 中的权重是通过全连接层计算的。 在训练过程中,相机参数的信息逐渐嵌入到这个全连接层的参数中,这可以看作是隐式神经表示,也可以看作是对训练集的过度拟合。 这种方法可能会导致相机参数的泛化能力较差。
我们直接将相机参数输入到网络中,并将输出空间到图像坐标空间的变换矩阵映射为高维特征向量。 然后,我们将此特征向量添加到实例特征中,并使用组合特征来计算相应视图的权重。
To address this, we directly input camera parameters into the network and map the transformation matrix from output space to image coordinate space into a high-dimensional feature vector. We then add this feature vector to the instance featureand use the combined feature to calculate the weights for the corresponding view.
在V2 中对该该算子的提及:
输入有三个:
- 特征图 I
- 投影点 P
- 权重 W
EDA 可以作为一种多功能运算,适用于需要多图像和多尺度融合的各种应用。
在V3 中的对该算子的提及:
2 temp_graph_model 模块是 MultiheadAttention
temp_graph_model=dict(
type="MultiheadAttention",
embed_dims=embed_dims if not decouple_attn else embed_dims * 2,
num_heads=num_groups,
batch_first=True,
dropout=drop_out,
)
if temporal
else None,
和 graph_model 一样。
2 graph_model模块是 MultiheadAttention
graph_model=dict(
type="MultiheadAttention",
embed_dims=embed_dims if not decouple_attn else embed_dims * 2,
num_heads=num_groups,
batch_first=True,
dropout=drop_out,
),
这个模块没有修改,官方的MultiheadAttention。我们来看看pytorch1.10中MultiheadAttention及TransformerEncoder当中mask的相关代码理解及使用 里面的说明和Transformer源码注解。多头注意力是把 hidden_size 先除以 head_num 再分别做 self_attention ,最后concat在一起。
1、哈佛NLP团队实现的Pytorch版代码链接在这里。
2、主要根据这一份文档来理解。
下图左边是单个注意力机制的示意图。右边是多头注意力机制,就是h个左图同时计算,结果再concat过线性。
def attention(query, key, value, mask=None, dropout=None):
"用于实现注意力机制,下图为注意力公式"
"通常,attention为自注意力,即q、k、v都由同一个输入过线性层得到"
"scores.masked_fill(mask,value)表示用value填充scores中与mask中值为1位置对应的元素。"
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = scores.softmax(dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
其实注意力计算通常有两种计算方法,一种就是dot-product attention,一种是additive attention。后者的实现采用一个隐藏层的feed-forward网络实现。前者与本文采用的attention一致,只是最后没有scale即没有乘以
1
d
k
{1}\over\sqrt{d_k}
dk1。
在dk比较小时,两者表现差不多,dk比较大时,没有scale的dot-product表现比不上additive attention。因此在本文中采用scale的方式来弥补。
class MultiHeadedAttention(nn.Module):
def __init__(self, h, d_model, dropout=0.1):
"h表示有几个单注意力机制,本文取8个"
"d_model表示模型的维度本文统一为512"
"单个注意力机制的维度d_k=d_v=d_model/h=512/8=64"
super(MultiHeadedAttention, self).__init__()
assert d_model % h == 0
# d_v 默认恒等于 d_k
self.d_k = d_model // h
self.h = h
#三个线形层用来生QKV,最后一个线性层在concat之后用于输出
self.linears = clones(nn.Linear(d_model, d_model), 4)
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
if mask is not None:
#mask的维度[batch_size,seq_len,seq_len]
#qkv的维度[batch_size,seq_len,d_model(512)]
#qkv经过linear之后维度变为[batch_size,h(8),seq_len,d_model/h(64)]
#为了保证在attention时mask的维度与经过linear之后的qkv也一样
#将mask扩展为[batch_size,1,seq_len,seq_len]
#八个头都用一样的mask
mask = mask.unsqueeze(1)
nbatches = query.size(0)
#q、k、v分别过线形层,[batch_size,seq_len,512]->[batch_size,8,seq_len,64]
#torch.transpose(dim1,dim2)表示将张量的dim1,dim2两个维度交换
query, key, value = [
lin(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
for lin, x in zip(self.linears, (query, key, value))
]
#做attention,所有bacth及head一起做
#x的维度[bacth_size,8,seq_len,64]
#self.attn的维度[bacth_size,8,seq_len,seq_len]
x, self.attn = attention(
query, key, value, mask=mask, dropout=self.dropout
)
# concat,将八个attention计算得到的64维矩阵concat,变成512
#先transpose,从[batch_size,8,seq_len,64]->[batch_size,seq_len,8,64]
#view,从[batch_size,seq_len,8,64]->[batch_size,seq_len,512]
x = (
x.transpose(1, 2)
.contiguous()
.view(nbatches, -1, self.h * self.d_k)
)
del query
del key
del value
#最后再过一个全连接层
return self.linears[-1](x)
3 refine
这个模块一般是连接在FFN 后面的。一般原版的transformer 的feed-forward networks由两个线形层及ReLU激活函数构成,对应公式如下图。
class PositionwiseFeedForward(nn.Module):
"Implements FFN equation."
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
return self.w_2(self.dropout(self.w_1(x).relu()))
对于这个SparseBox3DRefinementModule,应该是一个细化过程。
refine_layer=dict(
type="SparseBox3DRefinementModule",
embed_dims=embed_dims,
num_cls=num_classes,
refine_yaw=True,
with_quality_estimation=with_quality_estimation,
),
好像是个线性层,带了预测分支、质量分支(质量分支是v3特色).
源代码路径Sparse4D-main\projects\mmdet3d_plugin\models\detection3d\detection3d_blocks.py
对比 v2 和 v3 版本:
在 初始化该模块时增加了质量估计模块,提升性能。
self.with_quality_estimation = with_quality_estimation
if with_quality_estimation:
self.quality_layers = nn.Sequential(
*linear_relu_ln(embed_dims, 1, 2),
Linear(self.embed_dims, 2),
)
推理时增加了quality 输出。
...
quality = self.quality_layers(feature)
return output, cls, quality
该模块输入
instance_feature: 输入的实例特征,类型为PyTorch张量(tensor)。
anchor: 可能是某种“锚点”或基准特征,也为PyTorch张量。
anchor_embed: 锚点的嵌入或某种表示,同样为PyTorch张量。
time_interval: 一个时间间隔值,默认为1.0,可能用于后续的速度计算。
函数内部操作:
特征融合:
output = self.layers(instance_feature + anchor_embed)
将instance_feature与anchor_embed相加,然后传递给模型的self.layers进行处理。结果存储在output中。
其中
self.layers = nn.Sequential(
*linear_relu_ln(embed_dims, 2, 2),
Linear(self.embed_dims, self.output_dim),
Scale([1.0] * self.output_dim),
)
状态细化:
output[..., self.refine_state] = (
output[..., self.refine_state] + anchor[..., self.refine_state]
)
在output的某个特定维度(由self.refine_state指定)上,用anchor的相应值进行细化或更新。
Yaw角(可能是偏航角)的归一化
output[..., [SIN_YAW, COS_YAW]] = torch.nn.functional.normalize(
output[..., [SIN_YAW, COS_YAW]], dim=-1
)
如果self.normalize_yaw为True,则将output中与Yaw角相关的两个维度(由SIN_YAW和COS_YAW定义)进行归一化。
速度计算(如果self.output_dim大于8):
translation = torch.transpose(output[..., VX:], 0, -1)
velocity = torch.transpose(translation / time_interval, 0, -1)
output[..., VX:] = velocity + anchor[..., VX:]
如果time_interval不是PyTorch张量,则将其转换为与instance_feature相同类型的张量。
从output中提取与速度相关的部分(由VX定义),并计算速度。
使用time_interval将“位移”转换为“速度”,并更新output中与速度相关的部分。
分类结果:
cls = self.cls_layers(instance_feature)
如果return_cls为True,并且模型具有分类分支(self.with_cls_branch为True),则使用instance_feature作为输入计算分类结果,并存储在cls中。 否则,cls被设置为None。其中
if with_cls_branch:
self.cls_layers = nn.Sequential(
*linear_relu_ln(embed_dims, 1, 2),
Linear(self.embed_dims, self.num_cls),
)
v3 中还补充了质量分支
if with_quality_estimation:
self.quality_layers = nn.Sequential(
*linear_relu_ln(embed_dims, 1, 2),
Linear(self.embed_dims, 2),
)
quality = self.quality_layers(instance_feature + anchor_embed)
返回值:
函数返回两个值:output(经过处理和更新的特征)和cls(如果return_cls为True,则为分类结果;否则为None)。
4 Dense Depth Supervision
DenseDepthNet
代码在blocks.py
这个forward
函数定义了一个神经网络模块的前向传播过程,特别是用于处理特征图(feature_maps
)并输出深度预测(depths
)或深度预测的损失(loss
)。下面我将详细解释这个函数的工作流程:
-
输入参数:
feature_maps
: 输入的特征图列表或元组,通常来自网络中的不同层。focal
: 一个可选的缩放因子,用于在预测深度时调整每个特征图的权重。如果没有提供,则使用self.equal_focal
作为默认值。gt_depths
: 真实(或称为ground truth)的深度值,仅在训练时使用。
-
定义的模块
for i in range(num_depth_layers):
self.depth_layers.append(
nn.Conv2d(embed_dims, 1, kernel_size=1, stride=1, padding=0)
)
- 初始化
depths
列表:- 创建一个空列表
depths
,用于存储每个特征图对应的深度预测。
- 创建一个空列表
- 遍历特征图并计算深度预测:
- 通过循环遍历
feature_maps
中的前self.num_depth_layers
个特征图。 - 对于每个特征图,使用相应的
depth_layers[i]
层(可能是一个全连接层或其他类型的层)进行前向传播。
depth = self.depth_layers[i](feat.flatten(end_dim=1).float()).exp()
depth = depth.transpose(0, -1) * focal / self.equal_focal
depth = depth.transpose(0, -1)
depths.append(depth)
- 将特征图展平(
flatten(end_dim=1)
)并转换为浮点数(float()
),然后传递给depth_layers[i]
。 - 对输出的深度值应用指数函数(
exp()
),这通常用于将网络输出的对数深度值转换为线性深度值。 - 通过乘以
focal
并除以self.equal_focal
来调整深度预测值。这可能是为了平衡不同特征图对最终深度预测的贡献。 - 将调整后的深度预测添加到
depths
列表中。
5 穿插在操作顺序中的小细节:时序处理bank 和 box编码
instance_bank=dict(
type="InstanceBank",
num_anchor=900,
embed_dims=embed_dims,
anchor="nuscenes_kmeans900.npy",
anchor_handler=dict(type="SparseBox3DKeyPointsGenerator"),
num_temp_instances=600 if temporal else -1,
confidence_decay=0.6,
feat_grad=False,
),
anchor_encoder=dict(
type="SparseBox3DEncoder",
vel_dims=3,
embed_dims=[128, 32, 32, 64] if decouple_attn else 256,
mode="cat" if decouple_attn else "add",
output_fc=not decouple_attn,
in_loops=1,
out_loops=4 if decouple_attn else 2,
),
在 InstanceBank 中,设定了 get() update() cache() 等方法。
get()
一个名为 get 的方法,用于获取或处理与实例特征(instance_feature)和锚点(anchor)相关的数据。
初始扩展:
instance_feature 和 anchor 通过 torch.tile 方法被复制和扩展,以适应给定的 batch_size。这通常是为了将单个特征或锚点扩展为批处理中的每个元素所共有的特征或锚点。
instance_feature = torch.tile(
self.instance_feature[None], (batch_size, 1, 1)
)
anchor = torch.tile(self.anchor[None], (batch_size, 1, 1))
检查缓存的锚点:
代码检查 self.cached_anchor 是否存在,并且其第一个维度(即批处理大小)是否与当前请求的 batch_size 相同。
如果条件满足,代码将计算时间间隔(time_interval),这是通过当前请求的元数据(metas)中的时间戳与缓存元数据(self.metas)中的时间戳之间的差异来计算的。
然后,它创建一个掩码(self.mask),该掩码标识时间间隔是否小于或等于某个最大时间间隔(self.max_time_interval)。
if (
self.cached_anchor is not None
and batch_size == self.cached_anchor.shape[0]
):
history_time = self.metas["timestamp"]
time_interval = metas["timestamp"] - history_time
time_interval = time_interval.to(dtype=instance_feature.dtype)
self.mask = torch.abs(time_interval) <= self.max_time_interval
处理锚点投影:
如果 self.anchor_handler 存在,也就是SparseBox3DKeyPointsGenerator
,代码将计算从临时坐标系到当前坐标系的转换矩阵(T_temp2cur)。这是通过将 metas[“img_metas”] 中的每个元素的 T_global 与 self.metas[“img_metas”][i][“T_global_inv”] 相乘并堆叠结果来完成的。
使用 anchor_handler 的 anchor_projection 方法,根据计算出的时间间隔和转换矩阵,将 self.cached_anchor 投影到当前坐标系。
如果 dn_metas 也存在并且其批处理大小与 batch_size 相同,则对 dn_metas[“dn_anchor”] 进行相同的处理。
if self.anchor_handler is not None:
T_temp2cur = self.cached_anchor.new_tensor(
np.stack(
[
x["T_global_inv"]
@ self.metas["img_metas"][i]["T_global"]
for i, x in enumerate(metas["img_metas"])
]
)
)
self.cached_anchor = self.anchor_handler.anchor_projection(
self.cached_anchor,
[T_temp2cur],
time_intervals=[-time_interval],
)[0]
基于之前计算的掩码进一步处理或修改 time_interval。目的是更新time_interval张量:对于满足条件(非零且小于或等于self.max_time_interval)的时间间隔,它保持不变;对于不满足条件的时间间隔,它被设置为self.default_time_interval。这种操作在处理时间序列数据或具有时间依赖性的任务时很常见,尤其是在需要处理缺失值或异常值的情况下。
time_interval = torch.where(
torch.logical_and(time_interval != 0, self.mask),
time_interval,
time_interval.new_tensor(self.default_time_interval),
)
update 方法
update 方法似乎是用于更新实例特征和锚点,特别是在一个跟踪或目标检测的上下文中,其中可能涉及到历史缓存的特征和锚点。以下是该方法的详细解释:
初始化检查:
如果 self.cached_feature 是 None,则直接返回传入的 instance_feature 和 anchor,不进行任何更新。
if self.cached_feature is None:
return instance_feature, anchor
处理动态数量(num_dn):
检查传入的 instance_feature 的第二个维度(通常是特征数量)是否大于 self.num_anchor(可能是预定义的锚点数量)。
如果是,则计算 num_dn(动态特征的数量),并从 instance_feature 和 anchor 中分割出动态部分(dn_instance_feature 和 dn_anchor)。
接着,将 instance_feature、anchor 和 confidence 裁剪到 self.num_anchor 的大小,以仅保留非动态的部分。
num_dn = 0
if instance_feature.shape[1] > self.num_anchor:
num_dn = instance_feature.shape[1] - self.num_anchor
dn_instance_feature = instance_feature[:, -num_dn:]
dn_anchor = anchor[:, -num_dn:]
instance_feature = instance_feature[:, : self.num_anchor]
anchor = anchor[:, : self.num_anchor]
confidence = confidence[:, : self.num_anchor]
选择高置信度的实例:
计算 N,即要保留的锚点数量(从总锚点数量中减去临时实例的数量)。
使用 confidence.max(dim=-1).values 获取每个实例的最大置信度值。
使用 topk 函数(该函数未在代码段中定义,但似乎是从 confidence 中选择前 N 个最大值的索引)来选择高置信度的特征和锚点。
将选择的特征和锚点与缓存的特征和锚点拼接起来。
N = self.num_anchor - self.num_temp_instances
confidence = confidence.max(dim=-1).values
_, (selected_feature, selected_anchor) = topk(
confidence, N, instance_feature, anchor
)
selected_feature = torch.cat(
[self.cached_feature, selected_feature], dim=1
)
selected_anchor = torch.cat(
[self.cached_anchor, selected_anchor], dim=1
)
应用掩码:
使用 torch.where 函数和 self.mask(一个布尔掩码,通常表示哪些实例是有效的或活动的)来更新 instance_feature 和 anchor。如果某个实例在掩码中是活动的(True),则使用选定的特征或锚点;否则,保持原始值。
instance_feature = torch.where(
self.mask[:, None, None], selected_feature, instance_feature
)
anchor = torch.where(self.mask[:, None, None], selected_anchor, anchor)
更新实例ID:
如果 self.instance_id 不为 None,则使用类似的逻辑来更新实例ID。对于在 self.mask 中标记为活动的实例,保持其ID不变;对于其他实例,将ID设置为 -1(可能表示无效或未定义的ID)。
if self.instance_id is not None:
self.instance_id = torch.where(
self.mask[:, None],
self.instance_id,
self.instance_id.new_tensor(-1),
)
添加动态特征:
如果之前计算出了 num_dn(动态特征的数量)大于0,则将 dn_instance_feature 和 dn_anchor 拼接回 instance_feature 和 anchor 的末尾。
if num_dn > 0:
instance_feature = torch.cat(
[instance_feature, dn_instance_feature], dim=1
)
anchor = torch.cat([anchor, dn_anchor], dim=1)
返回更新后的特征和锚点:
返回更新后的 instance_feature 和 anchor。
这个方法的主要目的是:
利用历史缓存的特征和锚点来扩展当前的特征和锚点集。
根据置信度选择高置信度的实例。
使用掩码来更新哪些实例是活动的或有效的。
处理可能存在的动态特征(这些特征的数量可能会变化)。
cache方法
cache方法似乎是在一个目标检测或跟踪的上下文中,用于缓存一部分具有高置信度的实例的特征、锚点和相应的置信度。以下是该方法的详细解释:
检查num_temp_instances:
如果self.num_temp_instances小于或等于0,则不执行任何缓存操作,直接返回。
if self.num_temp_instances <= 0:
return
从计算图中分离:
使用.detach()方法从计算图中分离instance_feature、anchor和confidence,这样在后续操作中,这些张量就不会被用于反向传播。
instance_feature = instance_feature.detach()
anchor = anchor.detach()
confidence = confidence.detach()
保存元信息:
如果提供了metas参数,将其保存到self.metas中。
self.metas = metas
处理置信度:
对confidence执行max(dim=-1).values操作,沿着最后一个维度取最大值,得到每个实例的最高置信度值。
接着使用sigmoid()函数对置信度进行转换,虽然这通常用于将输出转换为概率值,但在这里可能只是为了将置信度值归一化到0到1之间。
confidence = confidence.max(dim=-1).values.sigmoid()
更新缓存的置信度:
如果self.confidence(之前缓存的置信度)不为None,则更新前self.num_temp_instances个实例的置信度。这里使用了torch.maximum函数,取self.confidence * self.confidence_decay(衰减后的旧置信度)和当前计算出的新置信度中的较大值。这是为了保留之前高置信度的实例,同时允许新的高置信度实例进入缓存。
if self.confidence is not None:
confidence[:, : self.num_temp_instances] = torch.maximum(
self.confidence * self.confidence_decay,
confidence[:, : self.num_temp_instances],
)
选择并缓存高置信度的实例:
使用topk函数(未在代码段中定义,但根据上下文,它可能是选择前k个最大值的函数)来选择置信度最高的self.num_temp_instances个实例的特征、锚点和置信度。
self.temp_confidence = confidence
(
self.confidence,
(self.cached_feature, self.cached_anchor),
) = topk(confidence, self.num_temp_instances, instance_feature, anchor)
将选择的特征和锚点保存到self.cached_feature和self.cached_anchor中,并将选择的置信度保存到self.confidence中。
同时,将当前的置信度(包括可能已更新的部分)保存到self.temp_confidence中,这可能是为了后续的临时使用或进一步的更新。
这个方法的主要目的是从一组实例中选择一部分具有高置信度的实例,并将它们的特征、锚点和置信度缓存起来,以供后续使用。这在多目标跟踪或目标检测任务中很常见,尤其是在需要维持一个活动实例集合并对其进行更新时。
get_instance_id方法
get_instance_id方法似乎是为了为目标检测或跟踪任务中的每个实例分配一个唯一的ID。以下是该方法的详细解释:
处理置信度:
使用max(dim=-1).values沿着最后一个维度取最大值,得到每个实例的最高置信度值。
接着使用sigmoid()函数对置信度进行转换,但这通常在这种上下文中是不必要的,因为max操作已经给出了每个实例的最高置信度分数。这里可能是为了保持与其他部分的代码一致性或归一化置信度值。
confidence = confidence.max(dim=-1).values.sigmoid()
初始化实例ID:
创建一个与confidence形状相同的新张量instance_id,并用-1填充。这表示所有实例在开始时都没有被分配ID。
instance_id = confidence.new_full(confidence.shape, -1).long()
使用旧的实例ID(如果可用):
如果self.instance_id(之前分配的实例ID)不为None,并且其第一个维度的大小与instance_id的第一个维度相同,则将self.instance_id的值复制到instance_id的对应位置。
确定需要新ID的实例:
创建一个布尔掩码mask,其中instance_id为负值的位置为True,表示这些实例还没有被分配ID。
如果提供了threshold,则更新mask,仅包含那些置信度大于或等于threshold的实例。
mask = instance_id < 0
if threshold is not None:
mask = mask & (confidence >= threshold)
分配新的实例ID:
计算需要新ID的实例数量num_new_instance。
使用torch.arange创建一个从self.prev_id开始、长度为num_new_instance的整数序列作为新的实例ID。
将这些新的实例ID赋值给instance_id中mask为True的位置。
更新self.prev_id以反映已经分配的新ID数量。
num_new_instance = mask.sum()
new_ids = torch.arange(num_new_instance).to(instance_id) + self.prev_id
instance_id[torch.where(mask)] = new_ids
self.prev_id += num_new_instance
获取临时置信度:
如果self.temp_confidence(之前缓存的置信度)为None,则根据传入的confidence张量计算临时置信度temp_conf。
如果confidence是三维的,通过max(dim=-1).values沿着最后一个维度(类别维度)取最大值,得到每个锚点的最高置信度。
如果confidence是二维的,则直接使用它作为临时置信度。
如果self.temp_confidence不为None,则直接使用它作为temp_conf。
if self.temp_confidence is None:
if confidence.dim() == 3: # bs, num_anchor, num_cls
temp_conf = confidence.max(dim=-1).values
else: # bs, num_anchor
temp_conf = confidence
else:
temp_conf = self.temp_confidence
选择高置信度的实例ID:
使用topk函数(该方法未在代码段中定义,但通常用于选择张量中最大的k个值及其索引)来选择temp_conf中置信度最高的self.num_temp_instances个实例的ID(假设instance_id是这些实例ID的索引或表示)。
topk返回两个值:值和索引。这里我们只关心索引(即实例ID),所以通过[1][0]获取它们。
使用squeeze(dim=-1)去除可能存在的最后一个维度(如果instance_id是一个二维张量,但只包含一个元素)。
更新并保存实例ID:
使用F.pad(其中F可能是torch.nn.functional的别名)将选定的instance_id填充到更大的张量中,其中填充值为-1(表示尚未分配的ID)。填充的大小是self.num_anchor - self.num_temp_instances,这意味着将所有选定的实例ID放在前面,后面跟着未分配的ID(-1)。
将填充后的instance_id保存到self.instance_id中,以便后续使用。
instance_id = topk(temp_conf, self.num_temp_instances, instance_id)[1][
0
]
instance_id = instance_id.squeeze(dim=-1)
self.instance_id = F.pad(
instance_id,
(0, self.num_anchor - self.num_temp_instances),
value=-1,
)
这个方法的目的是根据实例的置信度来更新和保存一个表示实例ID的张量。它确保只有置信度最高的self.num_temp_instances个实例被分配了有效的ID(非-1值),而其余实例的ID被设置为-1,表示它们尚未被分配或需要进一步的关注。
这个方法的目的是确保每个实例都有一个唯一的ID,并且当新的实例出现时,它们会被分配新的、递增的ID。通过保留和重用旧的实例ID(如果它们仍然有效),该方法可以提高效率并减少不必要的ID分配。同时,通过提供置信度阈值,可以进一步过滤掉那些不太可能是真实目标的实例。
SparseBox3DKeyPointsGenerator
处理锚点投影:
如果 self.anchor_handler 存在,也就是SparseBox3DKeyPointsGenerator
,代码将计算从临时坐标系到当前坐标系的转换矩阵(T_temp2cur)。
学习关键点
forward 的函数,似乎正在进行某种3D目标检测或跟踪任务,其中根据锚点(anchor)和实例特征(如果有的话)来计算关键点(key points)的位置,并可能根据时间戳和变换矩阵将这些关键点转换到不同的时间步。
anchor: 锚点的张量,通常包含目标的位置、尺寸和方向等信息。
instance_feature: 实例特征张量(可选),可能用于学习关键点的缩放。
初始化和固定尺度关键点:
从锚点中获取批量大小和锚点数量。
根据锚点的尺寸计算初始关键点位置(固定尺度)。
bs, num_anchor = anchor.shape[:2]
size = anchor[..., None, [W, L, H]].exp()
key_points = self.fix_scale * size
学习关键点缩放:
如果num_learnable_pts大于0且提供了instance_feature,则通过全连接层learnable_fc学习关键点的缩放。
缩放值是经过sigmoid函数处理并减去0.5的,这样缩放值就在-0.5到0.5之间。
将学习到的缩放应用到初始关键点位置上。
if self.num_learnable_pts > 0 and instance_feature is not None:
learnable_scale = (
self.learnable_fc(instance_feature)
.reshape(bs, num_anchor, self.num_learnable_pts, 3)
.sigmoid()
- 0.5
)
key_points = torch.cat(
[key_points, learnable_scale * size], dim=-2
)
旋转关键点:
创建一个旋转矩阵rotation_mat,根据锚点中的偏航角(yaw)的正弦和余弦值来设置。
将关键点与旋转矩阵相乘,以根据锚点的方向旋转关键点。
将旋转后的关键点与锚点的位置相加,得到最终的关键点位置。
rotation_mat = anchor.new_zeros([bs, num_anchor, 3, 3])
rotation_mat[:, :, 0, 0] = anchor[:, :, COS_YAW]
rotation_mat[:, :, 0, 1] = -anchor[:, :, SIN_YAW]
rotation_mat[:, :, 1, 0] = anchor[:, :, SIN_YAW]
rotation_mat[:, :, 1, 1] = anchor[:, :, COS_YAW]
rotation_mat[:, :, 2, 2] = 1
key_points = torch.matmul(
rotation_mat[:, :, None], key_points[..., None]
).squeeze(-1)
key_points = key_points + anchor[..., None, [X, Y, Z]]
时间变换:
如果提供了时间戳和变换矩阵,方法将计算过去时间步的关键点位置。
对于temp_timestamps中的每个时间戳,计算当前时间戳与该时间戳之间的时间间隔。
计算时间间隔和平移:
time_interval: 计算当前时间戳cur_timestamp与t_time之间的时间间隔。
translation: 根据速度和时间间隔计算每个关键点的平移量。这里使用了广播机制(broadcasting)来确保time_interval与velocity的维度兼容。
time_interval = cur_timestamp - t_time
translation = (
velocity
* time_interval.to(dtype=velocity.dtype)[:, None, None]
)
应用平移:
从key_points中减去平移量translation,得到临时关键点位置temp_key_points。
temp_key_points = key_points - translation[:, :, None]
应用坐标变换:
T_cur2temp: 从T_cur2temp_list中获取对应的坐标变换矩阵。
将temp_key_points与单位向量(使用torch.ones_like创建)拼接,形成齐次坐标。
使用矩阵乘法@将齐次坐标与变换矩阵相乘,实现坐标变换。
去除最后一维的None,因为矩阵乘法后多了一个维度。
T_cur2temp = T_cur2temp_list[i].to(dtype=key_points.dtype)
temp_key_points = (
T_cur2temp[:, None, None, :3]
@ torch.cat(
[
temp_key_points,
torch.ones_like(temp_key_points[..., :1]),# 形成齐次坐标
],
dim=-1,
).unsqueeze(-1)
)
temp_key_points = temp_key_points.squeeze(-1)
temp_key_points_list.append(temp_key_points)
返回结果
return key_points, temp_key_points_list
投影函数
anchor_projection 的函数,它接受一个锚点(anchor)和一些转换矩阵(T_src2dst_list),并将锚点从源坐标系投影到目标坐标系,同时考虑时间间隔(如果存在)对锚点位置的影响。
a. 提取速度: 从 anchor 中提取速度信息 vel。
b. 计算时间间隔: 如果提供了 time_intervals,则直接使用它;否则,如果提供了 src_timestamp 和 dst_timestamps,则计算时间间隔 time_interval。
c. 应用时间间隔到中心位置: 如果 time_interval 不为 None,则根据速度和时间间隔计算平移量 translation,并从锚点的中心位置 center 中减去平移量。
if time_interval is not None:
translation = vel.transpose(0, -1) * time_interval
translation = translation.transpose(0, -1)
center = center - translation
d. 应用转换矩阵到中心位置: 使用 T_src2dst 转换矩阵将中心位置从源坐标系转换到目标坐标系。
center = (
torch.matmul(
T_src2dst[..., :3, :3], center[..., None]
).squeeze(dim=-1)
+ T_src2dst[..., :3, 3]
)
e. 处理尺寸: 直接将锚点的尺寸 size 复制到目标锚点中(假设尺寸在转换过程中保持不变)。
size = anchor[..., [W, L, H]]
f. 处理偏航角: 使用 T_src2dst 转换矩阵的前两行两列来计算偏航角在目标坐标系中的表示。
yaw = torch.matmul(
T_src2dst[..., :2, :2],
anchor[..., [COS_YAW, SIN_YAW], None],
).squeeze(-1)
g. 处理速度: 使用 T_src2dst 转换矩阵中与速度维度相对应的部分来转换速度。
vel = torch.matmul(
T_src2dst[..., :vel_dim, :vel_dim], vel[..., None]
).squeeze(-1)
h. 组合目标锚点: 将转换后的中心位置、尺寸、偏航角和速度组合成一个新的锚点张量 dst_anchor,并将其添加到 dst_anchors 列表中。
dst_anchor = torch.cat([center, size, yaw, vel], dim=-1)
dst_anchors.append(dst_anchor)
return dst_anchors
box 编码
定义了一个神经网络层(或模块)的forward方法,它接受一个三维边界框(box_3d)作为输入,并输出一个特征向量。以下是这段代码的详细解释:
输入:
box_3d: 一个形状为 [batch_size, num_boxes, num_dims] 的张量,其中 num_dims 是三维边界框的属性数量。这些属性可能包括位置(X, Y, Z)、大小(W, L, H)、偏航角(yaw,通常以正弦和余弦的形式给出)以及可能的速度分量(VX, …)。
特征提取:
从box_3d中提取位置特征、大小特征和偏航角特征。这是通过索引不同的维度并使用全连接层(self.pos_fc, self.size_fc, self.yaw_fc)来实现的。
pos_feat = self.pos_fc(box_3d[…, [X, Y, Z]]):从box_3d中提取位置(X, Y, Z)并使用pos_fc全连接层进行处理。
size_feat = self.size_fc(box_3d[…, [W, L, H]]):从box_3d中提取大小(W, L, H)并使用size_fc全连接层进行处理。
yaw_feat = self.yaw_fc(box_3d[…, [SIN_YAW, COS_YAW]]):从box_3d中提取偏航角的正弦和余弦值并使用yaw_fc全连接层进行处理。
self.pos_fc = embedding_layer(3, embed_dims[0])
self.size_fc = embedding_layer(3, embed_dims[1])
self.yaw_fc = embedding_layer(2, embed_dims[2])
pos_feat = self.pos_fc(box_3d[..., [X, Y, Z]])
size_feat = self.size_fc(box_3d[..., [W, L, H]])
yaw_feat = self.yaw_fc(box_3d[..., [SIN_YAW, COS_YAW]])
特征组合:
根据self.mode的值(“add"或"cat”),将提取的特征进行相加或拼接。
如果self.mode == “add” 则使用元素级加法将特征相加。
如果self.mode == “cat”,则使用torch.cat沿着最后一个维度(dim=-1)将特征拼接起来。
if self.mode == "add":
output = pos_feat + size_feat + yaw_feat
elif self.mode == "cat":
output = torch.cat([pos_feat, size_feat, yaw_feat], dim=-1)
速度特征(可选):
如果self.vel_dims > 0,则从box_3d中提取速度特征。
vel_feat = self.vel_fc(box_3d[…, VX : VX + self.vel_dims]):从box_3d中提取速度分量并使用vel_fc全连接层进行处理。
同样地,根据self.mode的值,将速度特征与之前的输出特征相加或拼接。
if self.vel_dims > 0:
vel_feat = self.vel_fc(box_3d[..., VX : VX + self.vel_dims])
if self.mode == "add":
output = output + vel_feat
elif self.mode == "cat":
output = torch.cat([output, vel_feat], dim=-1)
输出层(可选):
如果self.output_fc不是None,则将组合后的特征通过output_fc全连接层进行进一步处理。
if self.output_fc is not None:
output = self.output_fc(output)
返回:
返回处理后的特征向量output。
这种设计允许该层以不同的方式组合输入的三维边界框特征,并且可以选择性地包括速度特征和额外的输出层。这种灵活性对于各种3D目标检测或跟踪任务可能是有用的。
V3 升级
去噪的想法
对比 v2 和 v3 ,增加了去噪的配置。引入去噪任务被证明是提高模型收敛稳定性和检测性能的有效方法。论文中描述了一个在3D目标检测任务中引入去噪任务的方法,以及这种方法如何扩展到3D时间(temporal)去噪来提高模型的收敛稳定性和检测性能。
二维检测中的去噪任务:
在二维目标检测中,引入去噪任务(即让模型学习区分真实的目标和噪声或假阳性样本)已经被证明是一种提高模型稳定性和性能的有效方法。
从2D到3D temporal去噪:
作者将这一方法从二维空间扩展到三维空间,并进一步引入了时间维度,即3D temporal去噪。这意味着在连续的3D帧(例如视频或时间序列数据)中,模型不仅要学习识别目标,还要学习区分跨帧的噪声。
Sparse4D中的实例解耦:
Sparse4D是一个3D目标检测框架,其中实例(instances)被解耦为隐式实例特征和显式锚点(anchors)。隐式特征捕捉目标的语义信息,而显式锚点则代表目标在3D空间中的可能位置。
锚点初始化:
初始化两组锚点:一组是均匀分布在检测空间中的,使用k均值方法初始化,并作为可学习参数;另一组是通过向地面实况(GT)添加噪声来生成的。
噪声锚点生成:
噪声锚点是通过向GT添加随机噪声来生成的。噪声的范围和分布是专门为3D检测任务定制的,以确保生成的噪声锚点在空间上合理分布。
正样本和负样本的确定:
在DINO-DETR中,简单地将基于不同噪声生成的样本分类为阳性或阴性可能会导致错误分配,因为噪声样本可能与真实目标接近。因此,使用二分图匹配来确定正样本和负样本,以确保准确性。
Temporal广播和噪声实例的时间传播:
将单帧噪声实例扩展到多帧,即temporal广播。在每一帧中,从噪声实例中随机选择一部分投影到下一帧。这有助于模型学习跨帧的目标和噪声的演变。
实例的独立性:
在整个过程中,保持每组实例的相互独立性,并且在噪声实例和正常实例之间不会发生特征交互。这有助于避免潜在的歧义和错误传播。
这种方法通过引入去噪任务和temporal去噪,不仅提高了模型在单帧中的检测性能,还增强了模型在连续帧中的稳定性和鲁棒性。这在处理如自动驾驶或视频分析等需要跨帧关联和追踪目标的任务时尤为重要。
sampler=dict(
type="SparseBox3DTarget",
num_dn_groups=5,
num_temp_dn_groups=3,
dn_noise_scale=[2.0] * 3 + [0.5] * 7,
max_dn_gt=32,
add_neg_dn=True,
cls_weight=2.0,
box_weight=0.25,
reg_weights=[2.0] * 3 + [0.5] * 3 + [0.0] * 4,
cls_wise_reg_weights={
class_names.index("traffic_cone"): [
2.0,
2.0,
2.0,
1.0,
1.0,
1.0,
0.0,
0.0,
1.0,
1.0,
],
},
),
定义了一个名为 sampler 的字典配置,这个配置通常用于3D目标检测或类似任务中的数据采样策略。SparseBox3DTarget。这通常意味着采样器将用于处理3D边界框(bounding boxes)的稀疏目标。
num_dn_groups:5: 这可能指定了不同难度的负样本(negative samples)的组数。在目标检测中,负样本是那些不包含目标对象的区域。
num_temp_dn_groups:3: 这可能是与某种时间或序列相关的负样本组数。具体意义取决于上下文,但通常与时间序列数据或视频数据中的帧相关。
dn_noise_scale: [ 2.0 ] ∗ 3 + [ 0.5 ] ∗ 7 [2.0] * 3 + [0.5] * 7 [2.0]∗3+[0.5]∗7: 这是一个列表,用于指定不同维度或属性的噪声缩放因子。这里,前三个维度的噪声缩放因子为2.0,接下来的七个维度为0.5。这可能是用于在生成负样本时添加噪声,以增加模型的鲁棒性。
在pipeline 中,最先出现的是:去噪模块,从sampler.get_dn_anchors得到 noisy-anchors 和 corresponding GT
get_dn_anchors() 用于生成某种“dn_anchors”(可能是指某种噪声增强的锚框)。以下是对该代码的详细解释:
函数定义:
get_dn_anchors 是一个方法,它接受四个参数:cls_target(类别目标),box_target(边界框目标),gt_instance_id(真实实例ID,默认为None)。
这个方法似乎与某种“dn_groups”和“temp_dn_groups”有关,可能是指动态锚框或某种分组策略。
初始检查:
如果 num_dn_groups 或 num_temp_dn_groups 小于或等于0,则直接返回None。
如果 max_dn_gt 大于0,则将 cls_target、box_target 和 gt_instance_id(如果非None)截断到 max_dn_gt 的长度。
if self.num_dn_groups <= 0:
return None
if self.num_temp_dn_groups <= 0:
gt_instance_id = None
填充和调整目标:
确定 cls_target、box_target 和 gt_instance_id 中的最大长度 max_dn_gt。
使用 F.pad 对这些目标进行填充,使其都具有相同的长度 max_dn_gt。对于类别目标,填充值为-1;对于其他目标,填充值可能根据上下文确定。
box_target 被编码(可能是转换为某种格式或归一化)。
如果某个类别的目标为-1(即填充值),则将其对应的边界框目标设置为0。
if self.max_dn_gt > 0:
cls_target = [x[: self.max_dn_gt] for x in cls_target]
box_target = [x[: self.max_dn_gt] for x in box_target]
if gt_instance_id is not None:
gt_instance_id = [x[: self.max_dn_gt] for x in gt_instance_id]
max_dn_gt = max([len(x) for x in cls_target])
if max_dn_gt == 0:
return None
cls_target = torch.stack(
[
F.pad(x, (0, max_dn_gt - x.shape[0]), value=-1)
for x in cls_target
]
)
box_target = self.encode_reg_target(box_target, cls_target.device)
box_target = torch.stack(
[F.pad(x, (0, 0, 0, max_dn_gt - x.shape[0])) for x in box_target]
)
box_target = torch.where(
cls_target[..., None] == -1, box_target.new_tensor(0), box_target
)
if gt_instance_id is not None:
gt_instance_id = torch.stack(
[
F.pad(x, (0, max_dn_gt - x.shape[0]), value=-1)
for x in gt_instance_id
]
)
bs, num_gt, state_dims = box_target.shape
复制目标(如果有多组dn_groups):
如果 num_dn_groups 大于1,则复制 cls_target、box_target 和 gt_instance_id(如果非None)以匹配 num_dn_groups 的数量。
bs, num_gt, state_dims = box_target.shape
if self.num_dn_groups > 1:
cls_target = cls_target.tile(self.num_dn_groups, 1)
box_target = box_target.tile(self.num_dn_groups, 1, 1)
if gt_instance_id is not None:
gt_instance_id = gt_instance_id.tile(self.num_dn_groups, 1)
添加噪声:
生成一个与 box_target 形状相同的随机噪声 noise,并将其缩放到某个范围(由 dn_noise_scale 确定)。
将 noise 加到 box_target 上,生成“dn_anchor”。
noise = torch.rand_like(box_target) * 2 - 1: 生成与box_target形状相同的随机噪声,范围在[-1, 1]之间。
noise *= box_target.new_tensor(self.dn_noise_scale): 将噪声乘以一个缩放因子(self.dn_noise_scale)。
dn_anchor = box_target + noise: 将噪声加到box_target上,得到扰动的锚点。
用于生成扰动(或噪声)的锚点(anchor)和目标(box_target),并进行一些匹配和赋值操作。以下是代码的详细解释:
添加负样本的扰动锚点(如果self.add_neg_dn为True):
生成另一种噪声noise_neg,其值为1加上随机噪声。
根据随机值决定noise_neg的符号(1或-1)。
对noise_neg进行缩放,并加到box_target上,得到另一组扰动的锚点。
将这两组扰动的锚点拼接起来,并相应地调整num_gt(可能表示真实目标框的数量)。
if self.add_neg_dn:
noise_neg = torch.rand_like(box_target) + 1
flag = torch.where(
torch.rand_like(box_target) > 0.5,
noise_neg.new_tensor(1),
noise_neg.new_tensor(-1),
)
noise_neg *= flag
noise_neg *= box_target.new_tensor(self.dn_noise_scale)
dn_anchor = torch.cat([dn_anchor, box_target + noise_neg], dim=1)
num_gt *= 2
计算锚点与真实框之间的成本(cost):
使用self._box_cost方法计算dn_anchor与box_target之间的成本。
box_cost = self._box_cost(
dn_anchor, box_target, torch.ones_like(box_target)
)
初始化扰动目标:
dn_box_target和dn_cls_target分别初始化为与dn_anchor形状相同的零张量和带有特定值的负张量。
如果提供了gt_instance_id,则dn_id_target也进行初始化。
dn_box_target = torch.zeros_like(dn_anchor)
dn_cls_target = -torch.ones_like(cls_target) * 3
if gt_instance_id is not None:
dn_id_target = -torch.ones_like(gt_instance_id)
if self.add_neg_dn:
dn_cls_target = torch.cat([dn_cls_target, dn_cls_target], dim=1)
if gt_instance_id is not None:
dn_id_target = torch.cat([dn_id_target, dn_id_target], dim=1)
根据成本进行锚点与真实框的匹配:
对于每个样本(dn_anchor.shape[0]次迭代),使用linear_sum_assignment(可能是匈牙利算法)找到锚点与真实框之间的最佳匹配。
将匹配到的真实框的坐标、类别和(如果提供)实例ID赋值给相应的锚点。
for i in range(dn_anchor.shape[0]):
cost = box_cost[i].cpu().numpy()
anchor_idx, gt_idx = linear_sum_assignment(cost)
anchor_idx = dn_anchor.new_tensor(anchor_idx, dtype=torch.int64)
gt_idx = dn_anchor.new_tensor(gt_idx, dtype=torch.int64)
dn_box_target[i, anchor_idx] = box_target[i, gt_idx]
dn_cls_target[i, anchor_idx] = cls_target[i, gt_idx]
if gt_instance_id is not None:
dn_id_target[i, anchor_idx] = gt_instance_id[i, gt_idx]
重新排列锚点和目标张量的形状:
使用reshape、permute和flatten方法对dn_anchor、dn_box_target等张量进行重新排列,以适应模型的后续处理。
dn_anchor = (
dn_anchor.reshape(self.num_dn_groups, bs, num_gt, state_dims)
.permute(1, 0, 2, 3)
.flatten(1, 2)
)
dn_box_target = (
dn_box_target.reshape(self.num_dn_groups, bs, num_gt, state_dims)
.permute(1, 0, 2, 3)
.flatten(1, 2)
)
dn_cls_target = (
dn_cls_target.reshape(self.num_dn_groups, bs, num_gt)
.permute(1, 0, 2)
.flatten(1)
)
设置valid_mask
valid_mask = dn_cls_target >= 0
这行代码创建了一个valid_mask布尔张量,其形状与dn_cls_target相同,用于标记哪些项是有效的(非填充的)。在这个上下文中,dn_cls_target很可能是一个包含类别标签的张量,其中正值表示真实的类别,而-1、-2、-3等可能表示填充值或特殊类别(如“无目标”或“忽略”)。
处理负样本的cls_target(如果self.add_neg_dn为True)
if self.add_neg_dn:
cls_target = (
torch.cat([cls_target, cls_target], dim=1)
.reshape(self.num_dn_groups, bs, num_gt)
.permute(1, 0, 2)
.flatten(1)
)
valid_mask = torch.logical_or(
valid_mask, ((cls_target >= 0) & (dn_cls_target == -3))
)
- 首先,
cls_target
被重复并重新排列以适应模型的新结构(可能是为了适应负样本的添加)。 - 接着,
valid_mask
被更新。除了原先dn_cls_target
中大于等于0的项被认为是有效的外,如果cls_target
中的项大于等于0且dn_cls_target
中的对应项为-3(可能是负样本的标识),这些项也被认为是有效的。
- 设置attn_mask
attn_mask = dn_box_target.new_ones(
num_gt * self.num_dn_groups, num_gt * self.num_dn_groups
)
for i in range(self.num_dn_groups):
start = num_gt * i
end = start + num_gt
attn_mask[start:end, start:end] = 0
attn_mask = attn_mask == 1
attn_mask
首先被初始化为一个全为1的二维张量,其形状为(num_gt * self.num_dn_groups, num_gt * self.num_dn_groups)
。- 接着,对于
attn_mask
中的每个num_gt x num_gt
的子块(共有self.num_dn_groups
个这样的子块),将其元素设置为0。这意味着在同一个子块内的锚点之间不会有注意力(或关联)计算。 - 最后,
attn_mask
被转换为一个布尔张量,其中值为1的位置表示应计算注意力,值为0的位置表示不应计算注意力。
- 数据类型转换和返回结果
dn_cls_target = dn_cls_target.long()
return (
dn_anchor,
dn_box_target,
dn_cls_target,
attn_mask,
valid_mask,
dn_id_target,
)
dn_cls_target
的数据类型被转换为long
,这通常是为了确保类别标签是整数类型。- 最后,代码返回了一个包含多个元素的元组,这些元素是处理后的锚点、目标框、类别标签、注意力掩码、有效掩码和(如果提供)实例ID目标。
总之,这段代码在处理目标检测或类似任务中的锚点和目标时,考虑了正样本和负样本的添加、注意力掩码的设置以及数据类型的转换,并返回了一个包含多个处理后的张量的元组。
调用self.sampler.update_dn
方法。
- 该方法返回更新后的
instance_feature
、anchor
以及其他与DN相关的目标变量。
这段代码定义了一个名为 update_dn
的方法,该方法用于更新与某些动态(可能是难例或特殊)样本(简称 “dn”)相关的特征、锚点、目标等。以下是该代码的详细解释:
方法参数:
instance_feature
: 实例特征,形状为 (bs, num_anchor, …),其中 bs 是批次大小,num_anchor 是锚点的数量。anchor
: 锚点,形状与instance_feature
的前两个维度相同。dn_reg_target
,dn_cls_target
,valid_mask
,dn_id_target
: 与 dn 样本相关的回归目标、分类目标、有效掩码和实例ID。num_noraml_anchor
: 正常(非dn)锚点的数量。temporal_valid_mask
: 临时有效掩码,可能用于某些时间相关的更新逻辑。
方法逻辑:
- 初始检查和返回:
- 如果
temporal_valid_mask
为 None,则将self.dn_metas
设置为 None。 - 如果
self.dn_metas
为 None 或者正常锚点的数量大于或等于总锚点数量,则直接返回原始输入(不进行任何更新)。
- 如果
bs, num_anchor = instance_feature.shape[:2]
if temporal_valid_mask is None:
self.dn_metas = None
if self.dn_metas is None or num_noraml_anchor >= num_anchor:
return (
instance_feature,
anchor,
dn_reg_target,
dn_cls_target,
valid_mask,
dn_id_target,
)
- 拆分非dn和dn的实例特征和锚点:
- 计算dn锚点的数量(
num_dn
)。 - 从
instance_feature
和anchor
中拆分出dn部分和非dn部分。
- 计算dn锚点的数量(
# split instance_feature and anchor into non-dn and dn
num_dn = num_anchor - num_noraml_anchor
dn_instance_feature = instance_feature[:, -num_dn:]
dn_anchor = anchor[:, -num_dn:]
instance_feature = instance_feature[:, :num_noraml_anchor]
anchor = anchor[:, :num_noraml_anchor]
- 重新整理dn元数据的形状:
- 根据
self.num_dn_groups
(dn组的数量)将dn的实例特征、锚点、回归目标、分类目标和有效掩码重新整理成四维张量。 - 如果
dn_id_target
不为 None,也进行同样的形状调整。
- 根据
# reshape all dn metas from (bs,num_all_dn,xxx)
# to (bs, dn_group, num_dn_per_group, xxx)
num_dn_groups = self.num_dn_groups
num_dn = num_dn // num_dn_groups
dn_feat = dn_instance_feature.reshape(bs, num_dn_groups, num_dn, -1)
dn_anchor = dn_anchor.reshape(bs, num_dn_groups, num_dn, -1)
dn_reg_target = dn_reg_target.reshape(bs, num_dn_groups, num_dn, -1)
dn_cls_target = dn_cls_target.reshape(bs, num_dn_groups, num_dn)
valid_mask = valid_mask.reshape(bs, num_dn_groups, num_dn)
if dn_id_target is not None:
dn_id = dn_id_target.reshape(bs, num_dn_groups, num_dn)
- 通过实例ID更新临时dn元数据:
- 获取
self.dn_metas
中的dn_instance_feature
和dn_id_target
。 - 确定临时dn的数量和组数。
- 创建一个比较张量
match
,其形状为 (bs, num_temp_dn_groups, num_temp_dn, num_dn),用于比较当前dn的ID和临时dn的ID。这个比较将用于后续的更新逻辑(尽管此段代码并未展示完整的更新逻辑)。
- 获取
# update temp_dn_metas by instance_id
temp_dn_feat = self.dn_metas["dn_instance_feature"]
_, num_temp_dn_groups, num_temp_dn = temp_dn_feat.shape[:3]
temp_dn_id = self.dn_metas["dn_id_target"]
- ID匹配:
- 使用广播机制创建了一个四维的
match
张量,表示当前dn的ID与临时dn的ID是否匹配。
match = temp_dn_id[..., None] == dn_id[:, :num_temp_dn_groups, None]
6.回归目标和分类目标的更新:
- 对于回归目标
dn_reg_target
,只有当match
为真时,才使用当前dn的回归目标来更新临时dn的回归目标。
temp_reg_target = (match[..., None] * dn_reg_target[:, :num_temp_dn_groups, None]).sum(dim=3)
- 对于分类目标
dn_cls_target
,如果match
的所有维度都不为真(即没有匹配的ID),则使用-1
(通常表示忽略或无效)来更新临时dn的分类目标。
temp_cls_target = torch.where(
torch.all(torch.logical_not(match), dim=-1),
self.dn_metas["dn_cls_target"].new_tensor(-1),
self.dn_metas["dn_cls_target"],
)
- 准备数据:
- 提取或创建
temp_dn_metas
和dn_metas
两个列表,分别包含临时dn和当前dn的元数据。
temp_dn_metas = [
temp_dn_feat,
temp_dn_anchor,
temp_reg_target,
temp_cls_target,
temp_valid_mask,
temp_dn_id,
]
dn_metas = [
dn_feat,
dn_anchor,
dn_reg_target,
dn_cls_target,
valid_mask,
dn_id,
]
处理长度不对齐:
- 如果临时dn的数量(
num_temp_dn
)少于当前dn的数量(num_dn
),则对临时dn的元数据进行填充(padding)。
if num_temp_dn < num_dn:
pad = (0, num_dn - num_temp_dn)
if temp_meta.dim() == 4:
pad = (0, 0) + pad
else:
assert temp_meta.dim() == 3
temp_meta = F.pad(temp_meta, pad, value=0)
else:
temp_meta = temp_meta[:, :, :num_dn]
- 如果临时dn的数量多于当前dn的数量,则只保留前
num_dn
个临时dn的元数据。
时间有效性掩码的应用:
- 使用
temporal_valid_mask
来决定是否使用临时dn的元数据或当前dn的元数据。如果temporal_valid_mask
为真,则使用临时dn的元数据;否则,使用当前dn的元数据。
mask = temporal_valid_mask[:, None, None]
if meta.dim() == 4:
mask = mask.unsqueeze(dim=-1)
temp_meta = torch.where(
mask, temp_meta, meta[:, :num_temp_dn_groups]
)
组合元数据:
- 将处理后的临时dn元数据与当前dn的元数据(跳过前
num_temp_dn_groups
个)进行拼接。
将拼接后的数据展平(flatten),并添加到output
列表中。
meta = torch.cat([temp_meta, meta[:, num_temp_dn_groups:]], dim=1)
meta = meta.flatten(1, 2)
output.append(meta)
这段代码在目标检测、跟踪或其他需要处理动态样本的场景中可能很有用,特别是在需要维护一个临时样本集(可能是难例样本)来进行特殊处理时。
cache_dn的方法
将噪声实例的特征(dn_instance_feature)、锚点(dn_anchor)、分类目标(dn_cls_target)、有效掩码(valid_mask)以及噪声实例的ID(dn_id_target)缓存起来。
这个cache_dn
函数的主要目的是从输入的动态样本(Dynamic Negative samples,简称dn)数据中缓存一部分数据作为临时动态样本(temp_dn),以便后续可能的使用。以下是该函数的详细解释:
-
参数说明:
dn_instance_feature
: 动态样本的特征。dn_anchor
: 动态样本的锚点信息(通常在目标检测或跟踪任务中使用)。dn_cls_target
: 动态样本的分类目标。valid_mask
: 标识动态样本是否有效的掩码。dn_id_target
: 动态样本的ID(如果可用)。
-
条件判断:
if self.num_temp_dn_groups < 0: return
:如果num_temp_dn_groups
(要缓存的临时动态样本组的数量)小于0,则函数直接返回,不执行任何操作。
-
变量定义:
num_dn_groups
: 当前动态样本组的总数。bs, num_dn
: 批量大小(batch size)和动态样本的总数。num_temp_dn
: 每个动态样本组中要缓存的临时动态样本的数量。
-
选择临时动态样本组:
- 使用
torch.randperm(num_dn_groups)
生成一个随机排列的索引,并通过temp_group_mask
选择前self.num_temp_dn_groups
个组作为临时动态样本组。 - 将
temp_group_mask
移至与dn_anchor
相同的设备上。
- 使用
-
重新塑形和索引:
- 将输入的
dn_instance_feature
、dn_anchor
、dn_cls_target
和valid_mask
(如果可用)根据组数num_dn_groups
和每个组中的临时动态样本数num_temp_dn
进行重塑。 - 使用
temp_group_mask
对重塑后的数据进行索引,选择出临时动态样本。
- 将输入的
-
处理
dn_id_target
:- 如果
dn_id_target
不为None
,则同样对其进行重塑和索引。
- 如果
-
更新缓存:
- 将选出的临时动态样本的特征、锚点、分类目标、有效掩码和ID(如果可用)存储在一个字典
self.dn_metas
中,以便后续使用。
- 将选出的临时动态样本的特征、锚点、分类目标、有效掩码和ID(如果可用)存储在一个字典
这个函数的主要用途是在处理大量动态样本时,通过缓存一部分样本来减少计算量或存储量,同时保留一定的随机性(通过随机选择临时动态样本组)来确保缓存的样本具有一定的代表性。这在一些需要处理大量负样本或背景样本的深度学习任务中尤为有用。
缓存这些数据是为了后续进行时间降噪(temporal denoising)处理时能够利用这些噪声实例的信息。
一个能够将含有噪声的输入数据映射为不带噪声的输出数据的神经网络模型。在训练过程中,模型学习如何从含噪声的数据中恢复出原始的无噪声数据,从而实现对噪声的去除。
sample() 是v2 一样的吧
这段代码定义了一个sample
方法,主要用于在目标检测任务中分配预测框(bounding boxes)到真实框(ground truth boxes)。具体来说,该方法根据预测的分类和边界框(box)与目标分类和边界框的匹配程度,计算出一个成本矩阵(cost matrix),然后利用匈牙利算法(也称为KM算法,即linear_sum_assignment
)进行最优匹配。
以下是对代码各部分的详细解释:
-
函数参数:
cls_pred
:预测的类别分数,形状为(bs, num_pred, num_cls)
,其中bs
是批次大小,num_pred
是预测的边界框数量,num_cls
是类别数量。box_pred
:预测的边界框坐标。cls_target
:目标类别标签。box_target
:目标边界框坐标。
-
计算分类成本:
cls_cost = self._cls_cost(cls_pred, cls_target)
:根据预测和目标的类别标签计算分类成本。
-
处理边界框目标:
box_target = self.encode_reg_target(box_target, box_pred.device)
:对目标边界框进行编码,并确保它们与预测边界框在同一个设备上。
-
计算实例回归权重:
- 循环遍历每个实例(即每个目标边界框),计算回归权重。这些权重决定了在优化过程中应如何重视每个边界框的坐标。
- 如果提供了类别特定的回归权重(
self.cls_wise_reg_weights
),则根据目标的类别调整权重。
-
计算边界框成本:
box_cost = self._box_cost(box_pred, box_target, instance_reg_weights)
:根据预测的边界框、目标边界框和回归权重计算边界框成本。
box_target = self.encode_reg_target(box_target, box_pred.device)
instance_reg_weights = []
for i in range(len(box_target)):
weights = torch.logical_not(box_target[i].isnan()).to(
dtype=box_target[i].dtype
)
if self.cls_wise_reg_weights is not None:
for cls, weight in self.cls_wise_reg_weights.items():
weights = torch.where(
(cls_target[i] == cls)[:, None],
weights.new_tensor(weight),
weights,
)
instance_reg_weights.append(weights)
box_cost = self._box_cost(box_pred, box_target, instance_reg_weights)
- 最优匹配:
- 对于每个批次,如果分类成本和边界框成本都有效,则计算总成本矩阵,并使用匈牙利算法进行最优匹配。
- 如果某个批次没有目标框(即
cls_target[i]
为空),则跳过该批次。 - 匹配结果存储在
indices
列表中,其中每个元素是一个元组,包含预测框的索引和目标框的索引。
indices = []
for i in range(bs):
if cls_cost[i] is not None and box_cost[i] is not None:
cost = (cls_cost[i] + box_cost[i]).detach().cpu().numpy()
cost = np.where(np.isneginf(cost) | np.isnan(cost), 1e8, cost)
assign = linear_sum_assignment(cost)
indices.append(
[cls_pred.new_tensor(x, dtype=torch.int64) for x in assign]
)
else:
indices.append([None, None])
-
更新输出目标:
- 初始化输出类别目标、边界框目标和回归权重为特定的初始值。
- 遍历
indices
列表,将匹配的类别目标和边界框目标更新到相应的输出中。
这个sample
方法通常用于目标检测算法中的“标签分配”或“匹配”步骤,是训练过程中的一个关键部分。
Quality Estimation
这个在 detection3d_blocks.py
中的with_quality_estimation
if with_quality_estimation:
self.quality_layers = nn.Sequential(
*linear_relu_ln(embed_dims, 1, 2),
Linear(self.embed_dims, 2),
)
这个看着是比较简单的一个层,主要是思想:对于3D检测任务,我们使用以下公式定义了两个质量度量:中心度和yawness度。一方面加快收敛,另一方面合理化预测排名。
在网络输出分类置信度的同时,它还估计了中心度和yawness度。它们各自的损失函数被定义为交叉熵损失和焦点损失。