Title: 三维点云深度网络 PointNeXt 源码阅读 (III) —— 骨干网络模型 (BaseSeg、PointNextEncoder、PointNextDecoder、SetAbstraction 和 FeaturePropogation)
文章目录
- 前言
- I. 整体模型 —— Tier 0
- 1. 模型对象的建立
- 2. BaseSeg 模型类
- II. 编码器与解码器 —— Tier 1
- 1. PoinrNextEncoder 类
- 2. PointNextDecoder 类
- 3. SegHead 类
- III. 点集提取与特征传播 ——Tier 2
- 1. SetAbstraction 类
- 2. FeaturePropogation 类
- 总结
关联博文
[1] 三维点云深度网络 PointNeXt 的安装配置与测试[2] 三维点云深度网络 PointNeXt 源码阅读 (I) —— 注册机制与参数解析
[3] 三维点云深度网络 PointNeXt 源码阅读 (II) —— 点云数据集构造与预处理
[4] 三维点云深度网络 PointNeXt 源码阅读 (III) —— 骨干网络模型 ⇐ \qquad \Leftarrow ⇐ 本篇
感谢原作者 Guocheng Qian 开源 PointNeXt 供我们学习 !
前言
完成前述博文的分析后, 我们学习和分析一下 PointNeXt 的网络模型的构建方法及实现.
如果运行
CUDA_VISIBLE_DEVICES=0,1 python examples/segmentation/main.py \
--cfg cfgs/s3dis/pointnext-s.yaml mode=train
就会产生三维点云深度网络 PointNeXt
族中最简单的 PointNeXt-S
网络.
下面我们尝试从整体到局部分析学习一下这个 PointNeXt-S 网络实例.
I. 整体模型 —— Tier 0
1. 模型对象的建立
main.py
中的 main()
函数内构建 PointNeXt 深度神经网络模型:
model = build_model_from_cfg(cfg.model).to(cfg.rank)
基于注册机制, 实际调用程序语句为 openpoints/utils/registry.py
中 build_from_cgf()
函数内语句
obj_cls(**obj_cfg)
把变量展开为
openpoints.models.segmentation.base_seg.BaseSeg(**obj_cfg)
其中参数 obj_cfg
为
encoder_args:
NAME: PointNextEncoder
blocks: [1, 1, 1, 1, 1]
strides: [1, 4, 4, 4, 4]
sa_layers: 2
sa_use_res: True
width: 32
in_channels: 4
expansion: 4
radius: 0.1
nsample: 32
aggr_args:
feature_type: dp_fj
reduction: max
group_args:
NAME: ballquery
normalize_dp: True
conv_args:
order: conv-norm-act
act_args:
act: relu
norm_args:
norm: bn
decoder_args:
NAME: PointNextDecoder
cls_args:
NAME: SegHead
num_classes: 13
in_channels: None
norm_args:
norm: bn
in_channels: 4
以上部分的理解可以参考三维点云深度网络 PointNeXt 源码阅读 (I) —— 注册机制与参数解析.
下面开启 PointNeXt 深度神经网络模型的自动构造.
即由上述字典形式的配置参数, 自动生成所需的三维点云深度学习网络架构.
2. BaseSeg 模型类
对 BaseSeg 这一基本模型类的注释如下:
# 为类 BaseSeg 加了装饰器 MODELS.register_module
# 调用 BaseSeg() 创建对象时, 效果相当于调用 MODELS.register_module(module=BaseSeg())
# 程序初始运行对类的装饰, 先于 __main__/main(), 但晚于注册器 MODELS 的建立.
# 所以在程序初始部分, 就以完成类的注册了, 待调用 main() 时, 就能顺利利用注册器将字符串转换为类
@MODELS.register_module()
class BaseSeg(nn.Module):
def __init__(self,
encoder_args=None,
decoder_args=None,
cls_args=None,
**kwargs):
super().__init__()
self.encoder = build_model_from_cfg(encoder_args)
# encoder_args =
# {'NAME': 'PointNextEncoder',
# 'blocks': [1, 1, 1, 1, 1],
# 'strides': [1, 4, 4, 4, 4],
# 'sa_layers': 2,
# 'sa_use_res': True,
# 'width': 32,
# 'in_channels': 4,
# 'expansion': 4,
# 'radius': 0.1,
# 'nsample': 32,
# 'aggr_args': {'feature_type': 'dp_fj', 'reduction': 'max'},
# 'group_args': {'NAME': 'ballquery', 'normalize_dp': True},
# 'conv_args': {'order': 'conv-norm-act'},
# 'act_args': {'act': 'relu'},
# 'norm_args': {'norm': 'bn'}
# }
#
# 递归地调用 build.build_model_from_cfg 来生成 BaseSeg 神经网络模型的 encoder 部分
# return obj_cls(**obj_cfg)
# obj_cls = <class 'openpoints.models.backbone.pointnext.PointNextEncoder'>
# obj_cfg 就是 encoder_args 去除 'NAME' 键值对后留下的
# 相当于调用 PointNextEncoder(**obj_cfg)
if decoder_args is not None:
decoder_args_merged_with_encoder = copy.deepcopy(encoder_args)
decoder_args_merged_with_encoder.update(decoder_args)
# decoder_args = {'NAME': 'PointNextDecoder'}
# 将 decoder_args_merged_with_encoder 字典中的 'NAME' 键值改一下, 即
# decoder_args_merged_with_encoder ['NAME'] = 'PointNextDecoder'
# 其他不变
decoder_args_merged_with_encoder.encoder_channel_list = self.encoder.channel_list if hasattr(self.encoder,
'channel_list') else None
# self.encoder.channel_list = [32, 64, 128, 256, 512]
# 新增 encoder_channel_list 参数
self.decoder = build_model_from_cfg(decoder_args_merged_with_encoder)
# decoder_args_merged_with_encoder =
# {'NAME': 'PointNextDecoder',
# 'blocks': [1, 1, 1, 1, 1],
# 'strides': [1, 4, 4, 4, 4],
# 'sa_layers': 2,
# 'sa_use_res': True,
# 'width': 32,
# 'in_channels': 4,
# 'expansion': 4,
# 'radius': 0.1,
# 'nsample': 32,
# 'aggr_args': {'feature_type': 'dp_fj', 'reduction': 'max'},
# 'group_args': {'NAME': 'ballquery', 'normalize_dp': True},
# 'conv_args': {'order': 'conv-norm-act'},
# 'act_args': {'act': 'relu'},
# 'norm_args': {'norm': 'bn'},
# 'encoder_channel_list': [32, 64, 128, 256, 512]
# }
#
# 递归地调用 build.build_model_from_cfg 来生成 BaseSeg 神经网络模型的 decoder 部分
# return obj_cls(**obj_cfg)
# obj_cls = <class 'openpoints.models.backbone.pointnext.PointNextDecoder'>
# obj_cfg 就是 decoder_args_merged_with_encoder 去除 'NAME' 键值对后留下的
# 相当于调用 PointNextDecoder(**obj_cfg)
else:
self.decoder = None
if cls_args is not None:
if hasattr(self.decoder, 'out_channels'):
in_channels = self.decoder.out_channels
# in_channels = 32
elif hasattr(self.encoder, 'out_channels'):
in_channels = self.encoder.out_channels
else:
in_channels = cls_args.get('in_channels', None)
cls_args.in_channels = in_channels
self.head = build_model_from_cfg(cls_args)
# cls_args =
# {'NAME': 'SegHead',
# 'num_classes': 13,
# 'in_channels': 32,
# 'norm_args': {'norm': 'bn'}
# }
#
# 递归地调用 build.build_model_from_cfg 来生成 BaseSeg 神经网络模型的 head 部分
# return obj_cls(**obj_cfg)
# obj_cls = <class 'openpoints.models.segmentation.base_seg.SegHead'>
# obj_cfg 就是 decoder_args_merged_with_encoder 去除 'NAME' 键值对后留下的
# 相当于调用 SegHead(**obj_cfg)
else:
self.head = None
def forward(self, data):
p, f = self.encoder.forward_seg_feat(data)
# p 对应位置 "pos", f 对应特征 "x"
# PointNextEncoder 对象 self.encoder 的前向传播过程
if self.decoder is not None:
f = self.decoder(p, f).squeeze(-1)
# 向 PointNextDecoder 对象 self.decoder 传递参数时, Pytorch 自动调用该对象的 forward(...)
if self.head is not None:
f = self.head(f)
# 向 SegHead 对象 self.head 传递参数时, Pytorch 自动调用该对象的 forward(...)
return f
前向传播过程, 如 Fig. 1:
整体模型 BaseSeg 分为 encoder、decoder 和 head 三个模块. 而这三部分又是由更细粒的模块组成.
我们一层一层分析学习其中的每一层的原理和细节.
因为 BaseSeg 处于最高层记为 Tier 0.
而 PointNextEncoder、PointNextDecoder、SegHead 处于次一层记为 Tier 1.
其他以此类推.
II. 编码器与解码器 —— Tier 1
1. PoinrNextEncoder 类
对 PointNextEncoder 类的源代码注释如下:
@MODELS.register_module()
class PointNextEncoder(nn.Module):
r"""The Encoder for PointNext
`"PointNeXt: Revisiting PointNet++ with Improved Training and Scaling Strategies".
<https://arxiv.org/abs/2206.04670>`_.
.. note::
For an example of using :obj:`PointNextEncoder`, see
`examples/segmentation/main.py <https://github.com/guochengqian/PointNeXt/blob/master/cfgs/s3dis/README.md>`_.
Args:
in_channels (int, optional): input channels . Defaults to 4.
width (int, optional): width of network, the output mlp of the stem MLP. Defaults to 32.
blocks (List[int], optional): # of blocks per stage (including the SA block). Defaults to [1, 4, 7, 4, 4].
strides (List[int], optional): the downsampling ratio of each stage. Defaults to [4, 4, 4, 4].
block (strorType[InvResMLP], optional): the block to use for depth scaling. Defaults to 'InvResMLP'.
nsample (intorList[int], optional): the number of neighbors to query for each block. Defaults to 32.
radius (floatorList[float], optional): the initial radius. Defaults to 0.1.
aggr_args (_type_, optional): the args for local aggregataion. Defaults to {'feature_type': 'dp_fj', "reduction": 'max'}.
group_args (_type_, optional): the args for grouping. Defaults to {'NAME': 'ballquery'}.
norm_args (_type_, optional): the args for normalization layer. Defaults to {'norm': 'bn'}.
act_args (_type_, optional): the args for activation layer. Defaults to {'act': 'relu'}.
expansion (int, optional): the expansion ratio of the InvResMLP block. Defaults to 4.
sa_layers (int, optional): the number of MLP layers to use in the SA block. Defaults to 1.
sa_use_res (bool, optional): wheter to use residual connection in SA block. Set to True only for PointNeXt-S.
"""
def __init__(self,
in_channels: int = 4,
width: int = 32,
blocks: List[int] = [1, 4, 7, 4, 4],
strides: List[int] = [4, 4, 4, 4],
block: str or Type[InvResMLP] = 'InvResMLP',
nsample: int or List[int] = 32,
radius: float or List[float] = 0.1,
aggr_args: dict = {'feature_type': 'dp_fj', "reduction": 'max'},
group_args: dict = {'NAME': 'ballquery'},
sa_layers: int = 1,
sa_use_res: bool = False,
**kwargs
):
super().__init__()
if isinstance(block, str):
block = eval(block) # <class 'openpoints.models.backbone.pointnext.InvResMLP'>
self.blocks = blocks
self.strides = strides
self.in_channels = in_channels
self.aggr_args = aggr_args
self.norm_args = kwargs.get('norm_args', {'norm': 'bn'})
self.act_args = kwargs.get('act_args', {'act': 'relu'})
self.conv_args = kwargs.get('conv_args', None)
self.sampler = kwargs.get('sampler', 'fps') # 键值不存在时, 默认值为 fps
self.expansion = kwargs.get('expansion', 4)
self.sa_layers = sa_layers
self.sa_use_res = sa_use_res
self.use_res = kwargs.get('use_res', True)
radius_scaling = kwargs.get('radius_scaling', 2)
nsample_scaling = kwargs.get('nsample_scaling', 1)
self.radii = self._to_full_list(radius, radius_scaling)
# [[0.1], [0.1], [0.2], [0.4], [0.8]] in PointNeXt-S
self.nsample = self._to_full_list(nsample, nsample_scaling)
# [[32], [32], [32], [32], [32]] in PointNeXt-S
logging.info(f'radius: {self.radii},\n nsample: {self.nsample}')
# double width after downsampling.
channels = []
for stride in strides:
if stride != 1:
width *= 2
channels.append(width)
# self.strides = [1, 4, 4, 4, 4]
# channels = [32, 64, 128, 256, 512]
# channels 在翻倍翻倍地增加
encoder = [] # encoder 将由更细粒的 5 个模块/类组成
for i in range(len(blocks)): # blocks = [1, 1, 1, 1, 1]
group_args.radius = self.radii[i]
group_args.nsample = self.nsample[i]
encoder.append(self._make_enc(
block, channels[i], blocks[i], stride=strides[i], group_args=group_args,
is_head=i == 0 and strides[i] == 1
))
# i = 0 时, is_head = True; 其他都是 False
self.encoder = nn.Sequential(*encoder)
self.out_channels = channels[-1] # 最后一个通道数作为该模块/类的输出通道数
self.channel_list = channels
def _to_full_list(self, param, param_scaling=1):
# param can be: radius, nsample
param_list = []
if isinstance(param, List):
# make param a full list
for i, value in enumerate(param):
value = [value] if not isinstance(value, List) else value
if len(value) != self.blocks[i]:
value += [value[-1]] * (self.blocks[i] - len(value))
param_list.append(value)
else: # radius is a scalar (in this case, only initial raidus is provide), then create a list (radius for each block)
for i, stride in enumerate(self.strides):
if stride == 1:
param_list.append([param] * self.blocks[i])
else:
param_list.append(
[param] + [param * param_scaling] * (self.blocks[i] - 1))
param *= param_scaling
return param_list
def _make_enc(self, block, channels, blocks, stride, group_args, is_head=False): # "_make_encode"
layers = []
radii = group_args.radius
nsample = group_args.nsample
group_args.radius = radii[0]
group_args.nsample = nsample[0]
layers.append(SetAbstraction(self.in_channels, channels,
self.sa_layers if not is_head else 1, stride,
group_args=group_args,
sampler=self.sampler,
norm_args=self.norm_args, act_args=self.act_args, conv_args=self.conv_args,
is_head=is_head, use_res=self.sa_use_res, **self.aggr_args
))
# encoder 将由更细粒的 5 个 SetAbstraction 模块/类组成
self.in_channels = channels
for i in range(1, blocks):
# 'PointNeXt-S => B=0', blocks = 1 故 range(1, blocks) = None, 整个网络中没有 InvResMLP
group_args.radius = radii[i]
group_args.nsample = nsample[i]
layers.append(block(self.in_channels,
aggr_args=self.aggr_args,
norm_args=self.norm_args, act_args=self.act_args, group_args=group_args,
conv_args=self.conv_args, expansion=self.expansion,
use_res=self.use_res
))
return nn.Sequential(*layers)
def forward_cls_feat(self, p0, f0=None):
if hasattr(p0, 'keys'):
p0, f0 = p0['pos'], p0.get('x', None)
if f0 is None:
f0 = p0.clone().transpose(1, 2).contiguous()
for i in range(0, len(self.encoder)):
p0, f0 = self.encoder[i]([p0, f0])
return f0.squeeze(-1)
def forward_seg_feat(self, p0, f0=None): # p0 = data
if hasattr(p0, 'keys'): # True
p0, f0 = p0['pos'], p0.get('x', None)
# p0.shape = torch.Size([32, 24000, 3])
# f0.shape = torch.Size([32, 4, 24000])
if f0 is None:
f0 = p0.clone().transpose(1, 2).contiguous()
p, f = [p0], [f0]
for i in range(0, len(self.encoder)): # len(self.encoder) = 5
_p, _f = self.encoder[i]([p[-1], f[-1]])
# p[-1] 是 p 列表中的最后一个,就是前一个 encoder 子类对象传过来的; f[-1] 类似
p.append(_p)
f.append(_f)
# p 和 f 中累计了初始 "pos" 位置数据 和 "x" 特征数据,
# 以及经过每一个 self.encoder[i] 后的 _p 和 _f 数据
return p, f
def forward(self, p0, f0=None):
return self.forward_seg_feat(p0, f0)
# 前向传递过程
PointNextEncoder 类对象 encoder 的前向传播过程, 如 Fig. 2 所示:
可以看出, 在 encoder 的前向传播过程中, 点云数量越来越少, 而特征维度越来越高.
具体实现细节, 还需要深入阅读分析 SetAbstraction 类才可知 (下文).
2. PointNextDecoder 类
对 PointNextDecoder 类的源代码注释如下:
@MODELS.register_module()
class PointNextDecoder(nn.Module):
def __init__(self,
encoder_channel_list: List[int],
decoder_layers: int = 2,
decoder_stages: int = 4,
**kwargs
):
super().__init__()
self.decoder_layers = decoder_layers # 2
self.in_channels = encoder_channel_list[-1]
# 512, encoder 的最后一层通道数作为 decoder 的输入通道数
skip_channels = encoder_channel_list[:-1] # [32, 64, 128, 256]
if len(skip_channels) < decoder_stages:
skip_channels.insert(0, kwargs.get('in_channels', 3))
# the output channel after interpolation
fp_channels = encoder_channel_list[:decoder_stages]
# [32, 64, 128, 256], 就是每一个 Feat Propagation 模块的输出通道数
n_decoder_stages = len(fp_channels) # 4
decoder = [[] for _ in range(n_decoder_stages)] # decoder = [[], [], [], []]
for i in range(-1, -n_decoder_stages - 1, -1): # -1, -2, -3, -4
decoder[i] = self._make_dec(
skip_channels[i], fp_channels[i])
# decoder 将由更细粒的 4 个 FeaturePropogation 模块/类组成
self.decoder = nn.Sequential(*decoder)
self.out_channels = fp_channels[-n_decoder_stages]
# 注意顺序是反过来的. 最小的通道数是输出通道数.
def _make_dec(self, skip_channels, fp_channels): # "_make_decoder"
layers = []
mlp = [skip_channels + self.in_channels] + \
[fp_channels] * self.decoder_layers
# list 合并
# mlp = [768, 256, 256], [384, 128, 128], [192, 64, 64], [96, 32, 32]
# 其中 “[skip_channels + self.in_channels]” 是因为 Feat Propagation 模块在对 skip connection 进行插值过后,
# 要在特征维度/通道数上进行连接 concat, 使得通道数/特征维度增加到 mlp[0] = skip_channels + self.in_channels
layers.append(FeaturePropogation(mlp))
# 生成一个 FeaturePropogation 类对象
self.in_channels = fp_channels # 当前输出通道数就是下一个的输入通道数
return nn.Sequential(*layers)
def forward(self, p, f):
for i in range(-1, -len(self.decoder) - 1, -1):
f[i - 1] = self.decoder[i][1:](
[p[i], self.decoder[i][0]([p[i - 1], f[i - 1]], [p[i], f[i]])])[1]
# decoder 部分, 表示点云位置 'pos' 的 p, 维持 encoder 部分传过来的 p 不变
# 注意 decoder 内部的 FeaturePropogation 类对象 顺序是反着的
# self.decoder[i][1:]() = Sequential() 外层计算没有效果
# f[i-1] = self.decoder[i][0]([p[i - 1], f[i - 1]], [p[i], f[i]])
# 对应于 FeaturePropogation 的前向传播过程
# [p[i - 1], f[i - 1]] 就是 U-Net 的 skip connection的输入
# 只更新了特征部分 "x" f[i - 1], 位置部分 "pos" p[i-1] 没更新
return f[-len(self.decoder) - 1]
PoinrNextDecoder 类对象 decoder 的前向传播过程, 如 Fig. 3:
可以看出 PointNeXt 也是基于 U-Net 网络架构. U-Net 网络相对于其他深度网络的区别是, 能够对每个像素/每个点进行分类, 而不是对整张图片或者整个点云场景进行分类. 这样就很好地对应了图像或点云的语义分割任务.
针对 PointNextDecoder 类更多的细节需要深入到 FeaturePropogation 类中进行探索 (下文).
3. SegHead 类
卷积核长度为 1 的一维卷积运算, 相当于对一点的所有通道上的特征值作全连接运算, 输出这一点的分类结果.
其他都相对比较直观, 此处省略.
III. 点集提取与特征传播 ——Tier 2
1. SetAbstraction 类
PointNeXt-S
网络中只包含 SetAbstraction
模块, 没有包含 InvResMLP
模块, 已经简化了不少.
对 SetAbstraction
类源码的注释如下, SetAbstraction
类对象的前向传播过程如 Fig. 2 中所示.
class SetAbstraction(nn.Module):
"""The modified set abstraction module in PointNet++ with residual connection support
"""
def __init__(self,
in_channels, out_channels,
layers=1,
stride=1,
group_args={'NAME': 'ballquery',
'radius': 0.1, 'nsample': 16},
norm_args={'norm': 'bn1d'},
act_args={'act': 'relu'},
conv_args=None,
sampler='fps',
feature_type='dp_fj',
use_res=False,
is_head=False,
**kwargs,
):
super().__init__()
self.stride = stride # 下采样的步幅, 一直保持 4
self.is_head = is_head
self.all_aggr = not is_head and stride == 1 # False
self.use_res = use_res and not self.all_aggr and not self.is_head # encoder[1] True
self.feature_type = feature_type # 'dp_fj'
mid_channel = out_channels // 2 if stride > 1 else out_channels # encoder[1] 32
channels = [in_channels] + [mid_channel] * \
(layers - 1) + [out_channels]
channels[0] = in_channels if is_head else CHANNEL_MAP[feature_type](channels[0])
# CHANNEL_MAP = {
# 'dp_fj': lambda x: 3 + x,
# }
# encoder[0]'s channel: [4, 32]
# encoder[1]'s channel: [35, 32, 64]
if self.use_res: # encoder[0] False , others True
self.skipconv = create_convblock1d(
in_channels, channels[-1], norm_args=None, act_args=None) if in_channels != channels[
-1] else nn.Identity()
self.act = create_act(act_args)
# actually, one can use local aggregation layer to replace the following
create_conv = create_convblock1d if is_head else create_convblock2d
# encoder[0]: create_convblock1d; encoder[1]: create_convblock2d
convs = []
for i in range(len(channels) - 1):
convs.append(create_conv(channels[i], channels[i + 1],
norm_args=norm_args if not is_head else None,
act_args=None if i == len(channels) - 2
and (self.use_res or is_head) else act_args,
**conv_args)
)
self.convs = nn.Sequential(*convs)
if not is_head:
if self.all_aggr: # False
group_args.nsample = None
group_args.radius = None
self.grouper = create_grouper(group_args) # 邻域分组方法 QueryAndGroup
self.pool = lambda x: torch.max(x, dim=-1, keepdim=False)[0]
if sampler.lower() == 'fps':
self.sample_fn = furthest_point_sample # 下采样方法
elif sampler.lower() == 'random':
self.sample_fn = random_sample
def forward(self, pf):
p, f = pf # p.shape = torch.Size([32, 24000, 3]), f.shape = torch.Size([32, 32, 24000])
if self.is_head:
f = self.convs(f) # (n, c)
else:
if not self.all_aggr: # encoder[1] 执行
idx = self.sample_fn(p, p.shape[1] // self.stride).long()
# encoder[1]'s idx.shape = torch.Size([32, 6000])
# self.stride 下采样的步幅
new_p = torch.gather(p, 1, idx.unsqueeze(-1).expand(-1, -1, 3))
# 沿着 dim=1 以 idx 采样 p
# 点云 p.shape = torch.Size([32, 24000, 3]), 下采样后 new_p = torch.Size([32, 6000, 3])
else:
new_p = p
""" DEBUG neighbor numbers.
query_xyz, support_xyz = new_p, p
radius = self.grouper.radius
dist = torch.cdist(query_xyz.cpu(), support_xyz.cpu())
points = len(dist[dist < radius]) / (dist.shape[0] * dist.shape[1])
logging.info(f'query size: {query_xyz.shape}, support size: {support_xyz.shape}, radius: {radius}, num_neighbors: {points}')
DEBUG end """
if self.use_res or 'df' in self.feature_type: # self.feature_type = 'dp_fj'
fi = torch.gather(
f, -1, idx.unsqueeze(1).expand(-1, f.shape[1], -1))
# 沿着 dim=2 以 idx 采样 f
# 点云特征 f.shape = torch.Size([32, 32, 24000]), 下采样后 fi = torch.Size([32, 32, 6000])
if self.use_res: # True
identity = self.skipconv(fi)
# identity.shape = torch.Size([32, 64, 6000])
# 得到的是直接对采样后点云特征本身的卷积处理结果
else:
fi = None
dp, fj = self.grouper(new_p, p, f)
# 以 new_p 中每个点为中心的邻域内各点形成的 group
# 即使采样后各点为中心的邻域内的点及其特征组成 group
# dp.shape = torch.Size([32, 3, 6000, 32])
# fj.shape = torch.Size([32, 32, 6000, 32])
fj = get_aggregation_feautres(new_p, dp, fi, fj, feature_type=self.feature_type)
# dp 与 fj 在 dim=1 上作连接 torch.cat, 新的 fj.shape = torch.Size([32, 35, 6000, 32])
# 这样 fj 通道数/特征维度 就是 35, 与即将计算的二维卷积的输入通道数一致
# dim=2 and 3 这两维将被 kernel_size = (1,1) 的二维卷积核做二维卷积运算
f = self.pool(self.convs(fj))
# self.convs(fj) 输出数据 shape 为 torch.Size([32, 64, 6000, 32])
# self.pool 在最后一个维度上求 max, 得到 f.shape = torch.Size([32, 64, 6000])
# 特征计算中已经涉及到的 group、pool 都与点云顺序无关, 只与点云之间的位置关系有关
# 得到的是对采样后点云各自邻域内各点云特征 group 的卷积处理结果
if self.use_res:
f = self.act(f + identity)
# “f + identity” 做残差运算,
# identity —— 直接对采样后点云特征本身的卷积处理结果,
# f —— 采样后点云各自邻域内各点云特征 group 的卷积处理结果
p = new_p # 下采后的点云
return p, f
这样实现对点云特征的提取. PointNextEncoder 模块利用串联的 SetAbstraction 模块, 一层一层地提取获得更深层次的特征.
2. FeaturePropogation 类
对 FeaturePropogation
类源码的注释如下, FeaturePropogation
类对象的前向传播过程如 Fig. 3 中所示. 其中关键是特征插值和特征融合.
class FeaturePropogation(nn.Module):
"""The Feature Propogation module in PointNet++
"""
def __init__(self, mlp,
upsample=True,
norm_args={'norm': 'bn1d'},
act_args={'act': 'relu'}
):
"""
Args:
mlp: [current_channels, next_channels, next_channels]
out_channels:
norm_args:
act_args:
"""
super().__init__()
if not upsample:
self.linear2 = nn.Sequential(
nn.Linear(mlp[0], mlp[1]), nn.ReLU(inplace=True))
mlp[1] *= 2
linear1 = []
for i in range(1, len(mlp) - 1):
linear1.append(create_convblock1d(mlp[i], mlp[i + 1],
norm_args=norm_args, act_args=act_args
))
self.linear1 = nn.Sequential(*linear1)
else:
convs = []
for i in range(len(mlp) - 1):
convs.append(create_convblock1d(mlp[i], mlp[i + 1],
norm_args=norm_args, act_args=act_args
))
self.convs = nn.Sequential(*convs)
self.pool = lambda x: torch.mean(x, dim=-1, keepdim=False)
def forward(self, pf1, pf2=None):
# pfb1 is with the same size of upsampled points
if pf2 is None:
_, f = pf1 # (B, N, 3), (B, C, N)
f_global = self.pool(f)
f = torch.cat(
(f, self.linear2(f_global).unsqueeze(-1).expand(-1, -1, f.shape[-1])), dim=1)
f = self.linear1(f)
else:
p1, f1 = pf1
p2, f2 = pf2
# p1.shape = [BatchSize, N-points, XYZ]
# f1.shape = [BatchSize, Features, N-Points]
# p1, f1 对应 skip-connection
# p1, f1 —— 点多, 但特征维度少. 为需要在特征维度上插值
# p2, f2 —— 点少, 但特征纬度高. 为特征维度上插值时的特征基础材料
if f1 is not None:
f = self.convs(
torch.cat((f1, three_interpolation(p1, p2, f2)), dim=1))
# three_interpolation(p1, p2, f2) 是进行特征插值, 以与每一个 p1 点最接近的 p2 中的三个点的特征来插值
# 这样得到的特征维度(特征通道)和 f2 一样高, 点数量(特征条目)和 p1 一样多
# torch.cat((f1, three_interpolation(p1, p2, f2)), dim=1) 在 特征维度/通道数上 上进行拼接/连接,
# 使得通道数/特征维度 增加到 mlp[0]
# 例子:
# f1.shape = torch.Size([32, 256, 375])
# p1.shape = torch.Size([32, 375, 3])
# f2.shape = torch.Size([32, 512, 93])
# p2.shape = torch.Size([32, 93, 3])
# three_interpolation(p1, p2, f2).shape = torch.Size([32, 512, 375])
# torch.cat((f1, three_interpolation(p1, p2, f2)), dim=1).shape = torch.Size([32, 768, 375])
# 这样就融合了 f1 和 f2 两个层次的特征, 其中 f1 是相对浅层次的特征, f2 是相对深层次的特征
# 再把融合后的新特征投入卷积网络
else:
f = self.convs(three_interpolation(p1, p2, f2))
return f
总结
学习分析了三维点云深度网络 PointNeXt 的基础构建模块 —— BaseSeg、PointNextEncoder、PointNextDecoder、SetAbstraction、FeaturePropogation 等.
在整个 PointNeXt 模型中, 这里涉及的为模型的高层次模块 (Tier 0、Tier 1、Tier 2).
其中 InvResMLP 还留待后面学习分析.
另外, 点云采样、特征插值等更基础的算法源码也待阅读分析.