三维点云深度网络 PointNeXt 源码阅读 (III) —— 骨干网络模型

news2025/1/11 10:47:14

  • 前言
  • I. 整体模型 —— Tier 0
    • 1. 模型对象的建立
    • 2. BaseSeg 模型类
  • II. 编码器与解码器 —— Tier 1
    • 1. PoinrNextEncoder 类
    • 2. PointNextDecoder 类
    • 3. SegHead 类
  • III. 点集提取与特征传播 ——Tier 2
    • 1. SetAbstraction 类
    • 2. FeaturePropogation 类
  • 总结

[1] 三维点云深度网络 PointNeXt 的安装配置与测试

[2] 三维点云深度网络 PointNeXt 源码阅读 (I) —— 注册机制与参数解析

[3] 三维点云深度网络 PointNeXt 源码阅读 (II) —— 点云数据集构造与预处理

[4] 三维点云深度网络 PointNeXt 源码阅读 (III) —— 骨干网络模型 ⇐ \qquad \Leftarrow 本篇

感谢原作者 Guocheng Qian 开源 PointNeXt 供我们学习 !


完成前述博文的分析后, 我们学习和分析一下 PointNeXt 的网络模型的构建方法及实现.


CUDA_VISIBLE_DEVICES=0,1 python examples/segmentation/main.py \
				--cfg cfgs/s3dis/pointnext-s.yaml  mode=train

就会产生三维点云深度网络 PointNeXt 族中最简单的 PointNeXt-S 网络.

下面我们尝试从整体到局部分析学习一下这个 PointNeXt-S 网络实例.

I. 整体模型 —— Tier 0

1. 模型对象的建立

main.py 中的 main() 函数内构建 PointNeXt 深度神经网络模型:

model = build_model_from_cfg(cfg.model).to(cfg.rank)

基于注册机制, 实际调用程序语句为 openpoints/utils/registry.pybuild_from_cgf() 函数内语句




其中参数 obj_cfg

  NAME: PointNextEncoder
  blocks: [1, 1, 1, 1, 1]
  strides: [1, 4, 4, 4, 4]
  sa_layers: 2
  sa_use_res: True
  width: 32
  in_channels: 4
  expansion: 4
  radius: 0.1
  nsample: 32
    feature_type: dp_fj
    reduction: max
    NAME: ballquery
    normalize_dp: True
    order: conv-norm-act
    act: relu
    norm: bn
  NAME: PointNextDecoder
  NAME: SegHead
  num_classes: 13
  in_channels: None
    norm: bn
in_channels: 4 

以上部分的理解可以参考三维点云深度网络 PointNeXt 源码阅读 (I) —— 注册机制与参数解析.

下面开启 PointNeXt 深度神经网络模型的自动构造.

即由上述字典形式的配置参数, 自动生成所需的三维点云深度学习网络架构.

2. BaseSeg 模型类

对 BaseSeg 这一基本模型类的注释如下:

# 为类 BaseSeg 加了装饰器 MODELS.register_module
# 调用 BaseSeg() 创建对象时, 效果相当于调用 MODELS.register_module(module=BaseSeg())
# 程序初始运行对类的装饰, 先于 __main__/main(), 但晚于注册器 MODELS 的建立.
# 所以在程序初始部分, 就以完成类的注册了, 待调用 main() 时, 就能顺利利用注册器将字符串转换为类 
class BaseSeg(nn.Module):
    def __init__(self,
        self.encoder = build_model_from_cfg(encoder_args)
        # encoder_args =
        # {'NAME': 'PointNextEncoder', 
        #  'blocks': [1, 1, 1, 1, 1], 
        #  'strides': [1, 4, 4, 4, 4], 
        #  'sa_layers': 2, 
        #  'sa_use_res': True, 
        #  'width': 32, 
        #  'in_channels': 4, 
        #  'expansion': 4, 
        #  'radius': 0.1, 
        #  'nsample': 32, 
        #  'aggr_args': {'feature_type': 'dp_fj', 'reduction': 'max'}, 
        #  'group_args': {'NAME': 'ballquery', 'normalize_dp': True}, 
        #  'conv_args': {'order': 'conv-norm-act'}, 
        #  'act_args': {'act': 'relu'}, 
        #  'norm_args': {'norm': 'bn'}
        #  }
        # 递归地调用 build.build_model_from_cfg 来生成 BaseSeg 神经网络模型的 encoder 部分
        # return obj_cls(**obj_cfg)
        # obj_cls = <class 'openpoints.models.backbone.pointnext.PointNextEncoder'>
        # obj_cfg 就是 encoder_args 去除 'NAME' 键值对后留下的
        # 相当于调用 PointNextEncoder(**obj_cfg)
        if decoder_args is not None:
            decoder_args_merged_with_encoder = copy.deepcopy(encoder_args)
            # decoder_args = {'NAME': 'PointNextDecoder'}
            # 将 decoder_args_merged_with_encoder 字典中的 'NAME' 键值改一下, 即
            # decoder_args_merged_with_encoder ['NAME'] = 'PointNextDecoder'
            # 其他不变
            decoder_args_merged_with_encoder.encoder_channel_list = self.encoder.channel_list if hasattr(self.encoder,
                                                                                            'channel_list') else None
            # self.encoder.channel_list = [32, 64, 128, 256, 512]
            # 新增 encoder_channel_list 参数
            self.decoder = build_model_from_cfg(decoder_args_merged_with_encoder)
            # decoder_args_merged_with_encoder =
            # {'NAME': 'PointNextDecoder', 
            #  'blocks': [1, 1, 1, 1, 1], 
            #  'strides': [1, 4, 4, 4, 4], 
            #  'sa_layers': 2, 
            #  'sa_use_res': True, 
            #  'width': 32, 
            #  'in_channels': 4, 
            #  'expansion': 4, 
            #  'radius': 0.1, 
            #  'nsample': 32, 
            #  'aggr_args': {'feature_type': 'dp_fj', 'reduction': 'max'}, 
            #  'group_args': {'NAME': 'ballquery', 'normalize_dp': True}, 
            #  'conv_args': {'order': 'conv-norm-act'}, 
            #  'act_args': {'act': 'relu'}, 
            #  'norm_args': {'norm': 'bn'}, 
            #  'encoder_channel_list': [32, 64, 128, 256, 512]
            #  }
            # 递归地调用 build.build_model_from_cfg 来生成 BaseSeg 神经网络模型的 decoder 部分
            # return obj_cls(**obj_cfg)
            # obj_cls = <class 'openpoints.models.backbone.pointnext.PointNextDecoder'>
            # obj_cfg 就是 decoder_args_merged_with_encoder 去除 'NAME' 键值对后留下的
            # 相当于调用 PointNextDecoder(**obj_cfg)
            self.decoder = None

        if cls_args is not None:
            if hasattr(self.decoder, 'out_channels'):
                in_channels = self.decoder.out_channels
                # in_channels = 32
            elif hasattr(self.encoder, 'out_channels'):
                in_channels = self.encoder.out_channels
                in_channels = cls_args.get('in_channels', None)
            cls_args.in_channels = in_channels
            self.head = build_model_from_cfg(cls_args) 
            # cls_args =
            # {'NAME': 'SegHead', 
            #  'num_classes': 13, 
            #  'in_channels': 32, 
            #  'norm_args': {'norm': 'bn'}
            #  }
            # 递归地调用 build.build_model_from_cfg 来生成 BaseSeg 神经网络模型的 head 部分
            # return obj_cls(**obj_cfg)
            # obj_cls = <class 'openpoints.models.segmentation.base_seg.SegHead'>
            # obj_cfg 就是 decoder_args_merged_with_encoder 去除 'NAME' 键值对后留下的
            # 相当于调用 SegHead(**obj_cfg)
            self.head = None

    def forward(self, data):
        p, f = self.encoder.forward_seg_feat(data)
        # p 对应位置 "pos", f 对应特征 "x"
        # PointNextEncoder 对象 self.encoder 的前向传播过程 
        if self.decoder is not None:
            f = self.decoder(p, f).squeeze(-1)
            # 向 PointNextDecoder 对象 self.decoder 传递参数时, Pytorch 自动调用该对象的 forward(...)
        if self.head is not None:
            f = self.head(f)
            # 向 SegHead 对象 self.head 传递参数时, Pytorch 自动调用该对象的 forward(...)
        return f

前向传播过程, 如 Fig. 1:

Fig. 1 BaseSeg 对象的前向传播过程

整体模型 BaseSeg 分为 encoder、decoder 和 head 三个模块. 而这三部分又是由更细粒的模块组成.


因为 BaseSeg 处于最高层记为 Tier 0.

而 PointNextEncoder、PointNextDecoder、SegHead 处于次一层记为 Tier 1.


II. 编码器与解码器 —— Tier 1

1. PoinrNextEncoder 类

对 PointNextEncoder 类的源代码注释如下:

class PointNextEncoder(nn.Module):
    r"""The Encoder for PointNext 
    `"PointNeXt: Revisiting PointNet++ with Improved Training and Scaling Strategies".
    .. note::
        For an example of using :obj:`PointNextEncoder`, see
        `examples/segmentation/main.py <https://github.com/guochengqian/PointNeXt/blob/master/cfgs/s3dis/README.md>`_.
        in_channels (int, optional): input channels . Defaults to 4.
        width (int, optional): width of network, the output mlp of the stem MLP. Defaults to 32.
        blocks (List[int], optional): # of blocks per stage (including the SA block). Defaults to [1, 4, 7, 4, 4].
        strides (List[int], optional): the downsampling ratio of each stage. Defaults to [4, 4, 4, 4].
        block (strorType[InvResMLP], optional): the block to use for depth scaling. Defaults to 'InvResMLP'.
        nsample (intorList[int], optional): the number of neighbors to query for each block. Defaults to 32.
        radius (floatorList[float], optional): the initial radius. Defaults to 0.1.
        aggr_args (_type_, optional): the args for local aggregataion. Defaults to {'feature_type': 'dp_fj', "reduction": 'max'}.
        group_args (_type_, optional): the args for grouping. Defaults to {'NAME': 'ballquery'}.
        norm_args (_type_, optional): the args for normalization layer. Defaults to {'norm': 'bn'}.
        act_args (_type_, optional): the args for activation layer. Defaults to {'act': 'relu'}.
        expansion (int, optional): the expansion ratio of the InvResMLP block. Defaults to 4.
        sa_layers (int, optional): the number of MLP layers to use in the SA block. Defaults to 1.
        sa_use_res (bool, optional): wheter to use residual connection in SA block. Set to True only for PointNeXt-S. 

    def __init__(self,
                 in_channels: int = 4,
                 width: int = 32,
                 blocks: List[int] = [1, 4, 7, 4, 4],
                 strides: List[int] = [4, 4, 4, 4],
                 block: str or Type[InvResMLP] = 'InvResMLP',
                 nsample: int or List[int] = 32,
                 radius: float or List[float] = 0.1,
                 aggr_args: dict = {'feature_type': 'dp_fj', "reduction": 'max'},
                 group_args: dict = {'NAME': 'ballquery'},
                 sa_layers: int = 1,
                 sa_use_res: bool = False,
        if isinstance(block, str):
            block = eval(block)  # <class 'openpoints.models.backbone.pointnext.InvResMLP'>
        self.blocks = blocks
        self.strides = strides
        self.in_channels = in_channels
        self.aggr_args = aggr_args
        self.norm_args = kwargs.get('norm_args', {'norm': 'bn'}) 
        self.act_args = kwargs.get('act_args', {'act': 'relu'}) 
        self.conv_args = kwargs.get('conv_args', None)
        self.sampler = kwargs.get('sampler', 'fps')  # 键值不存在时, 默认值为 fps
        self.expansion = kwargs.get('expansion', 4)
        self.sa_layers = sa_layers
        self.sa_use_res = sa_use_res
        self.use_res = kwargs.get('use_res', True)
        radius_scaling = kwargs.get('radius_scaling', 2)
        nsample_scaling = kwargs.get('nsample_scaling', 1)

        self.radii = self._to_full_list(radius, radius_scaling)
        # [[0.1], [0.1], [0.2], [0.4], [0.8]] in PointNeXt-S
        self.nsample = self._to_full_list(nsample, nsample_scaling)
        # [[32], [32], [32], [32], [32]] in PointNeXt-S
        logging.info(f'radius: {self.radii},\n nsample: {self.nsample}')

        # double width after downsampling.
        channels = []
        for stride in strides:
            if stride != 1:
                width *= 2
        # self.strides = [1, 4, 4, 4, 4]
        # channels = [32, 64, 128, 256, 512]
        # channels 在翻倍翻倍地增加
        encoder = []  # encoder 将由更细粒的 5 个模块/类组成
        for i in range(len(blocks)):  # blocks = [1, 1, 1, 1, 1]
            group_args.radius = self.radii[i]
            group_args.nsample = self.nsample[i]
                block, channels[i], blocks[i], stride=strides[i], group_args=group_args,
                is_head=i == 0 and strides[i] == 1
            # i = 0 时, is_head = True; 其他都是 False
        self.encoder = nn.Sequential(*encoder)
        self.out_channels = channels[-1]  # 最后一个通道数作为该模块/类的输出通道数
        self.channel_list = channels

    def _to_full_list(self, param, param_scaling=1):
        # param can be: radius, nsample
        param_list = []
        if isinstance(param, List):
            # make param a full list
            for i, value in enumerate(param):
                value = [value] if not isinstance(value, List) else value
                if len(value) != self.blocks[i]:
                    value += [value[-1]] * (self.blocks[i] - len(value))
        else:  # radius is a scalar (in this case, only initial raidus is provide), then create a list (radius for each block)
            for i, stride in enumerate(self.strides):
                if stride == 1:
                    param_list.append([param] * self.blocks[i])
                        [param] + [param * param_scaling] * (self.blocks[i] - 1))
                    param *= param_scaling
        return param_list

    def _make_enc(self, block, channels, blocks, stride, group_args, is_head=False):  # "_make_encode"
        layers = []
        radii = group_args.radius
        nsample = group_args.nsample
        group_args.radius = radii[0]
        group_args.nsample = nsample[0]
        layers.append(SetAbstraction(self.in_channels, channels,
                                     self.sa_layers if not is_head else 1, stride,
                                     norm_args=self.norm_args, act_args=self.act_args, conv_args=self.conv_args,
                                     is_head=is_head, use_res=self.sa_use_res, **self.aggr_args 
        # encoder 将由更细粒的 5 个 SetAbstraction 模块/类组成
        self.in_channels = channels
        for i in range(1, blocks): 
            # 'PointNeXt-S => B=0', blocks = 1 故 range(1, blocks) = None, 整个网络中没有 InvResMLP
            group_args.radius = radii[i]
            group_args.nsample = nsample[i]
                                norm_args=self.norm_args, act_args=self.act_args, group_args=group_args,
                                conv_args=self.conv_args, expansion=self.expansion,
        return nn.Sequential(*layers)

    def forward_cls_feat(self, p0, f0=None):
        if hasattr(p0, 'keys'):
            p0, f0 = p0['pos'], p0.get('x', None)
        if f0 is None:
            f0 = p0.clone().transpose(1, 2).contiguous()
        for i in range(0, len(self.encoder)):
            p0, f0 = self.encoder[i]([p0, f0])
        return f0.squeeze(-1)

    def forward_seg_feat(self, p0, f0=None):  # p0 = data
        if hasattr(p0, 'keys'):  # True
            p0, f0 = p0['pos'], p0.get('x', None)  
            # p0.shape = torch.Size([32, 24000, 3])
            # f0.shape = torch.Size([32, 4, 24000])
        if f0 is None:
            f0 = p0.clone().transpose(1, 2).contiguous()
        p, f = [p0], [f0]
        for i in range(0, len(self.encoder)):  # len(self.encoder) = 5
            _p, _f = self.encoder[i]([p[-1], f[-1]])  
            # p[-1] 是 p 列表中的最后一个,就是前一个 encoder 子类对象传过来的; f[-1] 类似
            # p 和 f 中累计了初始 "pos" 位置数据 和 "x" 特征数据, 
            # 以及经过每一个 self.encoder[i] 后的 _p 和 _f 数据 
        return p, f

    def forward(self, p0, f0=None):
        return self.forward_seg_feat(p0, f0)
        # 前向传递过程

PointNextEncoder 类对象 encoder 的前向传播过程, 如 Fig. 2 所示:

Fig. 2 PointNextEncoder 对象及其所包含的 SetAbstraction 对象的前向传播过程

可以看出, 在 encoder 的前向传播过程中, 点云数量越来越少, 而特征维度越来越高.

具体实现细节, 还需要深入阅读分析 SetAbstraction 类才可知 (下文).

2. PointNextDecoder 类

对 PointNextDecoder 类的源代码注释如下:

class PointNextDecoder(nn.Module):
    def __init__(self,
                 encoder_channel_list: List[int],
                 decoder_layers: int = 2,
                 decoder_stages: int = 4, 
        self.decoder_layers = decoder_layers  # 2
        self.in_channels = encoder_channel_list[-1] 
        # 512, encoder 的最后一层通道数作为 decoder 的输入通道数
        skip_channels = encoder_channel_list[:-1]  # [32, 64, 128, 256]
        if len(skip_channels) < decoder_stages:
            skip_channels.insert(0, kwargs.get('in_channels', 3))
        # the output channel after interpolation
        fp_channels = encoder_channel_list[:decoder_stages]
        # [32, 64, 128, 256], 就是每一个 Feat Propagation 模块的输出通道数

        n_decoder_stages = len(fp_channels)  # 4
        decoder = [[] for _ in range(n_decoder_stages)] # decoder = [[], [], [], []]
        for i in range(-1, -n_decoder_stages - 1, -1):  # -1, -2, -3, -4
            decoder[i] = self._make_dec(
                skip_channels[i], fp_channels[i])
        # decoder 将由更细粒的 4 个 FeaturePropogation 模块/类组成
        self.decoder = nn.Sequential(*decoder)
        self.out_channels = fp_channels[-n_decoder_stages]
        # 注意顺序是反过来的. 最小的通道数是输出通道数.

    def _make_dec(self, skip_channels, fp_channels):  # "_make_decoder"
        layers = []
        mlp = [skip_channels + self.in_channels] + \
              [fp_channels] * self.decoder_layers
        # list 合并
        # mlp = [768, 256, 256], [384, 128, 128], [192, 64, 64], [96, 32, 32]
        # 其中 “[skip_channels + self.in_channels]” 是因为 Feat Propagation 模块在对 skip connection 进行插值过后, 
        # 要在特征维度/通道数上进行连接 concat, 使得通道数/特征维度增加到 mlp[0] = skip_channels + self.in_channels

        # 生成一个 FeaturePropogation 类对象
        self.in_channels = fp_channels # 当前输出通道数就是下一个的输入通道数
        return nn.Sequential(*layers)

    def forward(self, p, f):
        for i in range(-1, -len(self.decoder) - 1, -1):
            f[i - 1] = self.decoder[i][1:](
                [p[i], self.decoder[i][0]([p[i - 1], f[i - 1]], [p[i], f[i]])])[1]
        # decoder 部分, 表示点云位置 'pos' 的 p, 维持 encoder 部分传过来的 p 不变 
        # 注意 decoder 内部的 FeaturePropogation 类对象 顺序是反着的
        # self.decoder[i][1:]() = Sequential() 外层计算没有效果
        # f[i-1] = self.decoder[i][0]([p[i - 1], f[i - 1]], [p[i], f[i]]) 
        # 对应于 FeaturePropogation 的前向传播过程
        # [p[i - 1], f[i - 1]] 就是 U-Net 的 skip connection的输入
        # 只更新了特征部分 "x" f[i - 1], 位置部分 "pos" p[i-1] 没更新
        return f[-len(self.decoder) - 1]

PoinrNextDecoder 类对象 decoder 的前向传播过程, 如 Fig. 3:

Fig. 3 PointNextDecoder 对象及其所包含的 FeaturePropogation 对象的前向传播过程

可以看出 PointNeXt 也是基于 U-Net 网络架构. U-Net 网络相对于其他深度网络的区别是, 能够对每个像素/每个点进行分类, 而不是对整张图片或者整个点云场景进行分类. 这样就很好地对应了图像或点云的语义分割任务.

针对 PointNextDecoder 类更多的细节需要深入到 FeaturePropogation 类中进行探索 (下文).

3. SegHead 类

卷积核长度为 1 的一维卷积运算, 相当于对一点的所有通道上的特征值作全连接运算, 输出这一点的分类结果.

其他都相对比较直观, 此处省略.

III. 点集提取与特征传播 ——Tier 2

1. SetAbstraction 类

PointNeXt-S 网络中只包含 SetAbstraction 模块, 没有包含 InvResMLP 模块, 已经简化了不少.

SetAbstraction 类源码的注释如下, SetAbstraction 类对象的前向传播过程如 Fig. 2 中所示.

class SetAbstraction(nn.Module):
    """The modified set abstraction module in PointNet++ with residual connection support

    def __init__(self,
                 in_channels, out_channels,
                 group_args={'NAME': 'ballquery',
                             'radius': 0.1, 'nsample': 16},
                 norm_args={'norm': 'bn1d'},
                 act_args={'act': 'relu'},
        self.stride = stride  # 下采样的步幅, 一直保持 4
        self.is_head = is_head
        self.all_aggr = not is_head and stride == 1   # False
        self.use_res = use_res and not self.all_aggr and not self.is_head # encoder[1] True
        self.feature_type = feature_type  # 'dp_fj'

        mid_channel = out_channels // 2 if stride > 1 else out_channels  # encoder[1] 32
        channels = [in_channels] + [mid_channel] * \
                   (layers - 1) + [out_channels]
        channels[0] = in_channels if is_head else CHANNEL_MAP[feature_type](channels[0])
        # CHANNEL_MAP = {
        #     'dp_fj': lambda x: 3 + x,
        # }

        # encoder[0]'s channel: [4, 32]
        # encoder[1]'s channel: [35, 32, 64]
        if self.use_res:  #  encoder[0] False , others True
            self.skipconv = create_convblock1d(
                in_channels, channels[-1], norm_args=None, act_args=None) if in_channels != channels[
                -1] else nn.Identity()
            self.act = create_act(act_args)

        # actually, one can use local aggregation layer to replace the following
        create_conv = create_convblock1d if is_head else create_convblock2d
        # encoder[0]: create_convblock1d;   encoder[1]: create_convblock2d
        convs = []
        for i in range(len(channels) - 1):
            convs.append(create_conv(channels[i], channels[i + 1],
                                     norm_args=norm_args if not is_head else None,
                                     act_args=None if i == len(channels) - 2
                                                      and (self.use_res or is_head) else act_args,
        self.convs = nn.Sequential(*convs)
        if not is_head:
            if self.all_aggr:  # False
                group_args.nsample = None
                group_args.radius = None
            self.grouper = create_grouper(group_args)  # 邻域分组方法 QueryAndGroup
            self.pool = lambda x: torch.max(x, dim=-1, keepdim=False)[0]
            if sampler.lower() == 'fps':
                self.sample_fn = furthest_point_sample   # 下采样方法
            elif sampler.lower() == 'random':
                self.sample_fn = random_sample

    def forward(self, pf):
        p, f = pf  # p.shape = torch.Size([32, 24000, 3]), f.shape = torch.Size([32, 32, 24000])
        if self.is_head:
            f = self.convs(f)  # (n, c)
            if not self.all_aggr:  # encoder[1] 执行
                idx = self.sample_fn(p, p.shape[1] // self.stride).long()   
                # encoder[1]'s idx.shape = torch.Size([32, 6000])
                # self.stride 下采样的步幅
                new_p = torch.gather(p, 1, idx.unsqueeze(-1).expand(-1, -1, 3))  
                # 沿着 dim=1 以 idx 采样 p
                # 点云 p.shape = torch.Size([32, 24000, 3]), 下采样后 new_p = torch.Size([32, 6000, 3])
                new_p = p
            """ DEBUG neighbor numbers. 
            query_xyz, support_xyz = new_p, p
            radius = self.grouper.radius
            dist = torch.cdist(query_xyz.cpu(), support_xyz.cpu())
            points = len(dist[dist < radius]) / (dist.shape[0] * dist.shape[1])
            logging.info(f'query size: {query_xyz.shape}, support size: {support_xyz.shape}, radius: {radius}, num_neighbors: {points}')
            DEBUG end """
            if self.use_res or 'df' in self.feature_type:   # self.feature_type = 'dp_fj'
                fi = torch.gather(
                    f, -1, idx.unsqueeze(1).expand(-1, f.shape[1], -1))
                # 沿着 dim=2 以 idx 采样 f
                # 点云特征 f.shape = torch.Size([32, 32, 24000]), 下采样后 fi = torch.Size([32, 32, 6000])
                if self.use_res:  # True
                    identity = self.skipconv(fi)   
                    # identity.shape = torch.Size([32, 64, 6000])
                    # 得到的是直接对采样后点云特征本身的卷积处理结果
                fi = None
            dp, fj = self.grouper(new_p, p, f)  
            # 以 new_p 中每个点为中心的邻域内各点形成的 group
            # 即使采样后各点为中心的邻域内的点及其特征组成 group
            # dp.shape = torch.Size([32, 3, 6000, 32])
            # fj.shape = torch.Size([32, 32, 6000, 32])
            fj = get_aggregation_feautres(new_p, dp, fi, fj, feature_type=self.feature_type)
            # dp 与 fj 在 dim=1 上作连接 torch.cat, 新的 fj.shape = torch.Size([32, 35, 6000, 32])
            # 这样 fj 通道数/特征维度 就是 35, 与即将计算的二维卷积的输入通道数一致
            # dim=2 and 3 这两维将被 kernel_size = (1,1) 的二维卷积核做二维卷积运算
            f = self.pool(self.convs(fj))
            # self.convs(fj) 输出数据 shape 为 torch.Size([32, 64, 6000, 32])
            # self.pool 在最后一个维度上求 max, 得到 f.shape = torch.Size([32, 64, 6000])
            # 特征计算中已经涉及到的 group、pool 都与点云顺序无关, 只与点云之间的位置关系有关
            # 得到的是对采样后点云各自邻域内各点云特征 group 的卷积处理结果
            if self.use_res:
                f = self.act(f + identity)
                # “f + identity” 做残差运算, 
                # identity —— 直接对采样后点云特征本身的卷积处理结果, 
                # f —— 采样后点云各自邻域内各点云特征 group 的卷积处理结果
            p = new_p  # 下采后的点云
        return p, f

这样实现对点云特征的提取. PointNextEncoder 模块利用串联的 SetAbstraction 模块, 一层一层地提取获得更深层次的特征.

2. FeaturePropogation 类

FeaturePropogation 类源码的注释如下, FeaturePropogation 类对象的前向传播过程如 Fig. 3 中所示. 其中关键是特征插值特征融合.

class FeaturePropogation(nn.Module):
    """The Feature Propogation module in PointNet++

    def __init__(self, mlp,
                 norm_args={'norm': 'bn1d'},
                 act_args={'act': 'relu'}
            mlp: [current_channels, next_channels, next_channels]
        if not upsample:
            self.linear2 = nn.Sequential(
                nn.Linear(mlp[0], mlp[1]), nn.ReLU(inplace=True))
            mlp[1] *= 2
            linear1 = []
            for i in range(1, len(mlp) - 1):
                linear1.append(create_convblock1d(mlp[i], mlp[i + 1],
                                                  norm_args=norm_args, act_args=act_args
            self.linear1 = nn.Sequential(*linear1)
            convs = []
            for i in range(len(mlp) - 1):
                convs.append(create_convblock1d(mlp[i], mlp[i + 1],
                                                norm_args=norm_args, act_args=act_args
            self.convs = nn.Sequential(*convs)

        self.pool = lambda x: torch.mean(x, dim=-1, keepdim=False)

    def forward(self, pf1, pf2=None):
        # pfb1 is with the same size of upsampled points
        if pf2 is None:
            _, f = pf1  # (B, N, 3), (B, C, N)
            f_global = self.pool(f)
            f = torch.cat(
                (f, self.linear2(f_global).unsqueeze(-1).expand(-1, -1, f.shape[-1])), dim=1)
            f = self.linear1(f)
            p1, f1 = pf1
            p2, f2 = pf2
            # p1.shape = [BatchSize, N-points, XYZ]
            # f1.shape = [BatchSize, Features, N-Points]
            # p1, f1 对应 skip-connection
            # p1, f1 —— 点多, 但特征维度少. 为需要在特征维度上插值
            # p2, f2 —— 点少, 但特征纬度高. 为特征维度上插值时的特征基础材料
            if f1 is not None:
                f = self.convs(
                    torch.cat((f1, three_interpolation(p1, p2, f2)), dim=1))
                # three_interpolation(p1, p2, f2) 是进行特征插值, 以与每一个 p1 点最接近的 p2 中的三个点的特征来插值
                # 这样得到的特征维度(特征通道)和 f2 一样高, 点数量(特征条目)和 p1 一样多 
                # torch.cat((f1, three_interpolation(p1, p2, f2)), dim=1) 在 特征维度/通道数上 上进行拼接/连接, 
                # 使得通道数/特征维度 增加到 mlp[0]
                # 例子:
                # f1.shape = torch.Size([32, 256, 375])
                # p1.shape = torch.Size([32, 375, 3])
                # f2.shape = torch.Size([32, 512, 93])
                # p2.shape = torch.Size([32, 93, 3])
                # three_interpolation(p1, p2, f2).shape = torch.Size([32, 512, 375])
                # torch.cat((f1, three_interpolation(p1, p2, f2)), dim=1).shape = torch.Size([32, 768, 375])
                # 这样就融合了 f1 和 f2 两个层次的特征, 其中 f1 是相对浅层次的特征, f2 是相对深层次的特征
                # 再把融合后的新特征投入卷积网络

                f = self.convs(three_interpolation(p1, p2, f2))
        return f


学习分析了三维点云深度网络 PointNeXt 的基础构建模块 —— BaseSeg、PointNextEncoder、PointNextDecoder、SetAbstraction、FeaturePropogation 等.

在整个 PointNeXt 模型中, 这里涉及的为模型的高层次模块 (Tier 0、Tier 1、Tier 2).

其中 InvResMLP 还留待后面学习分析.

另外, 点云采样、特征插值等更基础的算法源码也待阅读分析.





