文章目录
- datasets/AB2019BASDataset.py
- datasets/ext_transforms.py
- network/modules.py
- network/DBINet.py
- network/DBINet_Backbone.py
- AB2019_train.py
代码链接:DBINet
datasets/AB2019BASDataset.py
加载Australia Bushfire 2019 Burned Area Segmentation Dataset
的自定义数据集类AB2019BASDataset
。
def __init__(self, is_train, voc_dir)
:输入数据集目录路径voc_dir
,数据jpeg图像路径为voc_dir/Images
、掩膜png图像路径为voc_dir/Masks
。def __getitem__(self, idx)
:根据索引获取图像和掩模,分别转换为RGB和灰度格式并返回。def __len__(self)
:返回数据集样本总数。
执行逻辑见代码复现(二)。
datasets/ext_transforms.py
定义了一些扩展的图像变换类,主要用于语义分割任务。
network/modules.py
定义了DBINet、ResNet、PVT模型中的常用模块。
class ChannelAttention(nn.Module)
:实现通道注意力机制,以增强神经网络模型中通道维度的重要性。class SpatialAttention(nn.Module)
:实现空间注意力机制,以增强神经网络模型中空间维度的重要性。def autopad(k, p=None, d=1)
:根据卷积核大小、填充方式、膨胀系数来计算padding
参数,使得卷积操作的输出尺寸与输入尺寸一致。class Conv(nn.Module)
:实现卷积模块,包含 C o n v + B a t c h N o r m a l i z a t i o n + R e L U Conv+BatchNormalization+ReLU Conv+BatchNormalization+ReLU。class CBAM(nn.Module)
:实现卷积注意力模块。class Bottleneck(nn.Module)
:实现ResNet的瓶颈块结果。class C2(nn.Module)
:实现基于CSP(Cross Stage Partial)架构的瓶颈模块。class SRM(nn.Module)
:定于 S e l f − R e f i n e m e n t Self-Refinement Self−Refinement模块,见论文Attention guided contextual feature fusion network for salient object detection、EfficientNet: Rethinking Model Scaling for Convolutional Neural Networks。class Decoder(nn.Module)
:定义解码器模块。
class Decoder(nn.Module):
#ch_in:输入通道数;ch_out:输出通道数;n:Bottleneck模块的数量;shortcut:是否使用残差连接;g:组卷积的数量;e:膨胀系数
def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
super().__init__()
#隐藏通道数
self.c = int(c2 * e)
self.cv1 = Conv(c1, 2 * self.c, 1, 1)
self.cv2 = Conv(2 * self.c, c2, 1) # optional act=FReLU(c2)
#n个Bottleneck模块组成的顺序容器,用于提取特征
self.m = nn.Sequential(*(Bottleneck(self.c, self.c, shortcut, g, k=((3, 3), (3, 3)), e=1.0) for _ in range(n)))
#自我精细化模块(Self Refinement Module)
self.sr=SRM(c2)
def forward(self, x):
a, b = self.cv1(x).split((self.c, self.c), 1)
out=self.cv2(torch.cat((self.m(a), b), 1))
out=self.sr(out)
return out
class REBNCONV(nn.Module)
:定义卷积模块,包含 C o n v + B a t c h N o r m l i z a t i o n + R e L U Conv+BatchNormlization+ReLU Conv+BatchNormlization+ReLU操作。def _upsample_like(src,tar)
:通过线性插值法完成上采样操作。class Convertor(nn.Module)
:定义转换器模块。
class Convertor(nn.Module):
def __init__(self, in_ch=64, out_ch=64):
super(Convertor,self).__init__()
mid_ch=in_ch
self.rebnconv1 = REBNCONV(mid_ch,mid_ch,dirate=1)
self.pool1 = nn.MaxPool2d(2,stride=2,ceil_mode=True)
self.rebnconv2 = REBNCONV(mid_ch,mid_ch,dirate=1)
self.pool2 = nn.MaxPool2d(2,stride=2,ceil_mode=True)
self.rebnconv3 = REBNCONV(mid_ch,mid_ch,dirate=1)
self.pool3 = nn.MaxPool2d(2,stride=2,ceil_mode=True)
self.rebnconv4 = REBNCONV(mid_ch,mid_ch,dirate=1)
self.rebnconv5 = REBNCONV(mid_ch,mid_ch,dirate=2)
self.cbam1=CBAM(mid_ch, mid_ch)
self.cbam2=CBAM(mid_ch, mid_ch)
self.cbam3=CBAM(mid_ch, mid_ch)
self.cbam4=CBAM(mid_ch, mid_ch)
self.rebnconv4d = REBNCONV(mid_ch*2,mid_ch,dirate=1)
self.rebnconv3d = REBNCONV(mid_ch*2,mid_ch,dirate=1)
self.rebnconv2d = REBNCONV(mid_ch*2,mid_ch,dirate=1)
self.rebnconv1d = REBNCONV(mid_ch*2,out_ch,dirate=1)
def forward(self,x1,x2,x3,x4):
hx1 = self.rebnconv1(x1)
hx = self.pool1(hx1)
hx2 = self.rebnconv2(hx+x2)
hx = self.pool2(hx2)
hx3 = self.rebnconv3(hx+x3)
hx = self.pool3(hx3)
hx4 = self.rebnconv4(hx+x4)
hx5 = self.rebnconv5(hx4)
hx4d = self.rebnconv4d(torch.cat((hx5,self.cbam4(hx4)),1))
hx4dup = _upsample_like(hx4d,hx3)
hx3d = self.rebnconv3d(torch.cat((hx4dup,self.cbam3(hx3)),1))
hx3dup = _upsample_like(hx3d,hx2)
hx2d = self.rebnconv2d(torch.cat((hx3dup,self.cbam2(hx2)),1))
hx2dup = _upsample_like(hx2d,hx1)
hx1d = self.rebnconv1d(torch.cat((hx2dup,self.cbam1(hx1)),1))
return hx1d, hx2d, hx3d, hx4d
network/DBINet.py
定义类DBINet,包含了模型的总体执行流程。
def __init__(self,**kwargs)
:初始化DBINet的各个模块。def forward(self,inputs)
:定义DBINet的前向传播过程。
class DBINet(nn.Module):
def __init__(self, **kwargs):
super(DBINet, self).__init__()
#初始化DBI编码器架构
self.backbone = DBIBkb()
#定义特征图通道数,mid_ch表中间层通道数,eout_channels包含从主干网络输出的不同阶段特征图通道数,out_ch是最终输出的通道数
mid_ch=64
eout_channels=[64, 256, 512, 1024, 2048]
out_ch=1
#对解码器输出四张特征图的卷积操作,卷积将不同层级输出的特征图压缩到mid_ch通道数
self.eside1=Conv(eout_channels[1], mid_ch)
self.eside2=Conv(eout_channels[2], mid_ch)
self.eside3=Conv(eout_channels[3], mid_ch)
self.eside4=Conv(eout_channels[4], mid_ch)
#定义转换器
self.convertor=Convertor(mid_ch, mid_ch)
#定义上采样,可将特征图空间分辨率增加两倍
self.upsample2 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
#定义解码器部分
self.decoder4 = Decoder(mid_ch, mid_ch)
self.decoder3 = Decoder(mid_ch, mid_ch)
self.decoder2 = Decoder(mid_ch, mid_ch)
self.decoder1 = Decoder(mid_ch, mid_ch)
#定义对特征图s1、s2、s3、s4的卷积操作
self.dside1 = nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1)
self.dside2 = nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1)
self.dside3 = nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1)
self.dside4 = nn.Conv2d(mid_ch, out_ch, kernel_size=3, stride=1, padding=1)
#使用kaiming初始化法初始化卷积层和批量归一化层的参数
for m in self.modules():
if isinstance(m, nn.Conv2d):
init_weights(m, init_type='kaiming')
elif isinstance(m, nn.BatchNorm2d):
init_weights(m, init_type='kaiming')
def forward(self, inputs):
#获取输入特征图的高和宽,后续用于上采样时恢复原始尺寸
H, W = inputs.size(2), inputs.size(3)
#编码器输出(四个ResNet Block的输出和最后DFM融合的输出特征)
outs=self.backbone(inputs)
c1, c2, c3, c4, c5 = outs
#通过卷积操作改变编码器输出特征图的通道数
c1=self.eside1(c1)
c2=self.eside2(c2)
c3=self.eside3(c3)
c4=self.eside4(c4)
#接受转换器的输出
ca1,ca2,ca3,ca4=self.convertor(c1,c2,c3,c4)
#将ca1特征图通过双线性插值的方式调整到ca2, ca3, ca4 的尺寸,便于后续特征融合
ca12=F.interpolate(ca1, size=ca2.size()[2:], mode='bilinear', align_corners=True)
ca13=F.interpolate(ca1, size=ca3.size()[2:], mode='bilinear', align_corners=True)
ca14=F.interpolate(ca1, size=ca4.size()[2:], mode='bilinear', align_corners=True)
#将转换器输出、编码器输出、上一解码器的输出(decoder4则对应DFM的输出)进行融合,输入到解码器模块中
up4=self.decoder4(ca14 + c4 + c5)
up3=self.decoder3(ca13 + c3 + self.upsample2(up4))
up2=self.decoder2(ca12 + c2 + self.upsample2(up3))
up1=self.decoder1(ca1 + c1 + self.upsample2(up2))
#通过卷积操作将特征图up1、up2、up3、up4转换为单通道的输出特征图d1、d2、d3、d4
d1=self.dside1(up1)
d2=self.dside2(up2)
d3=self.dside3(up3)
d4=self.dside4(up4)
#通过插值法将输出特征图恢复到图像的原始尺寸(Upsample)
S1 = F.interpolate(d1, size=(H, W), mode='bilinear', align_corners=True)
S2 = F.interpolate(d2, size=(H, W), mode='bilinear', align_corners=True)
S3 = F.interpolate(d3, size=(H, W), mode='bilinear', align_corners=True)
S4 = F.interpolate(d4, size=(H, W), mode='bilinear', align_corners=True)
#返回四张特征图,并进行sigmoid操作作为最终输出
return S1,S2,S3,S4, torch.sigmoid(S1), torch.sigmoid(S2), torch.sigmoid(S3), torch.sigmoid(S4)
network/DBINet_Backbone.py
定义DBINet编码器的执行流程。
def autopad(k, p=None, d=1)
:实现same
填充。计算卷积操作中的填充(padding)参数,确保输出形状与输入形状相同。k
:卷积核大小。p
:是否使用填充。d
:空洞卷积膨胀系数。
def autopad(k, p=None, d=1): # kernel, padding, dilation
# Pad to 'same' shape outputs
if d > 1:
k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k] # actual kernel-size
if p is None:
p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
return p
class Conv(nn.Module)
:实现DBINet中的卷积模块( C o n v + B a t c h N o r m a l i z a t i o n + R e L U Conv+Batch\;Normalization+ReLU Conv+BatchNormalization+ReLU)。def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True)
:
def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True):
super().__init__()
self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=False)
self.bn = nn.BatchNorm2d(c2)
self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
def forward(self, x)
:定义前向传播过程( C o n v + B a t c h N o r m a l i z a t i o n + R e L U Conv+Batch\;Normalization+ReLU Conv+BatchNormalization+ReLU)。
def forward(self, x):
return self.act(self.bn(self.conv(x)))
def forward_fuse(self, x)
:前向融合传播,跳过批归一化层,只进行卷积和激活操作。这个函数常用于推理阶段,通过移除批归一化层加快推理速度。
def forward_fuse(self, x):
return self.act(self.conv(x))
def weight_init(module)
:递归遍历一个模型的所有子模块,并根据模块类型(如卷积层、批归一化层、线性层等)使用不同方法进行初始化。
def weight_init(module):
#named_children()返回模型的所有子模块的名称和对应的子模块对象
for n, m in module.named_children():
print('initialize: '+n)
if isinstance(m, nn.Conv2d):
#卷积层使用Kaiming正态分布初始化权重,mode='fan_in'表示根据输入特征图数量来计算缩放系数,nonlinearity='relu'表明卷积层后面使用的是ReLU激活函数
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
#若有偏置则初始化为零
if m.bias is not None:
nn.init.zeros_(m.bias)
#批归一化层(BatchNorm2d)或组归一化层(GroupNorm)将尺度因子gamma初始化为1,偏置初始化为0.
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.ones_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
#线性层初始化方式同卷积层
elif isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
if m.bias is not None:
nn.init.zeros_(m.bias)
else:
#其他自定义的子模块,则使用自己的initialize()方法初始化
m.initialize()
class Bottleneck(nn.Module)
:实现ResNet网络中的瓶颈块(Bottleneck Block,上图即为普通残差结构与瓶颈结构)。通常用于深层卷积神经网络中,通过引入1x1的卷积来减少维度,然后再用 3x3 的卷积进行特征提取,最后通过另一个 1x1 的卷积恢复维度。这样设计能够降低计算量,同时保持较深网络的表达能力。__init__(self, inplanes, planes, stride=1, downsample=None, dilation=1)
def __init__(self, inplanes, planes, stride=1, downsample=None, dilation=1):
super(Bottleneck, self).__init__()
#1x1卷积
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
#batch normalization
self.bn1 = nn.BatchNorm2d(planes)
#3x3卷积,padding可使得在不改变特征图大小的情况下进行卷积操作
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=(3*dilation-1)//2, bias=False, dilation=dilation)
self.bn2 = nn.BatchNorm2d(planes)
#3x3卷积,通道数扩展到4*planes来恢复到瓶颈块的输出维度
self.conv3 = nn.Conv2d(planes, planes*4, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm2d(planes*4)
#可选的下采样层
self.downsample = downsample
def forward(self,x)
:定义前向传播。
def forward(self, x):
#残差分支
residual = x
out = F.relu(self.bn1(self.conv1(x)), inplace=True)
out = F.relu(self.bn2(self.conv2(out)), inplace=True)
out = self.bn3(self.conv3(out))
#若downsample不为空,则调整残差分支尺寸,使之能与out相加
if self.downsample is not None:
residual = self.downsample(x)
#残差连接+relu
return F.relu(out+residual, inplace=True)
class DFM(nn.Module)
:定义DFM模块。
class DFM(nn.Module):
def __init__(self, cur_in_ch, sup_in_ch, out_ch, mid_ch=64):
super(DFM, self).__init__()
#f_cur对应的卷积模块(Conv+BatchNorm+ReLU)
self.cur_cv = Conv(cur_in_ch, mid_ch)
#f_sup对应的卷积模块(Conv+BatchNorm+ReLU)
self.sup_cv = Conv(sup_in_ch, mid_ch)
self.fuse1 = Conv(mid_ch*2, mid_ch)
self.fuse2 = Conv(mid_ch, out_ch)
def forward(self, x_cur, x_sup):
#f_cur、f_sup输入卷积模块
x_cur1=self.cur_cv(x_cur)
x_sup1=self.sup_cv(x_sup)
#将f_cur、f_sup进行拼接
xfuse=torch.cat([x_cur1, x_sup1], dim=1)
#xfuse输入卷积模块,并与x_cur1执行元素加法
x=self.fuse1(xfuse) + x_cur1
#输入卷积模块得到f_out
x=self.fuse2(x)
return x
class DBIkb(nn.Module)
:实现DBI模型的编码器架构。__init__(self, img_size=224, patch_size=4, in_chans=256, embed_dims=[64, 128, 320, 512], num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4], qkv_bias=True, qk_scale=None, drop_rate=0.0, attn_drop_rate=0., drop_path_rate=0.1, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[2, 2, 2, 2], sr_ratios=[8, 4, 2, 1], num_stages=4, F4=False)
:实现了用作主编码器的 R e s N e t 50 ResNet50 ResNet50和用作辅助编码器的 P V T ( P y r a m i d V i s i o n T r a n s f o r m e r ) PVT(Pyramid\;Vision\;Transformer) PVT(PyramidVisionTransformer), R e s N e t ResNet ResNet提取初步的低层次特征, P V T PVT PVT通过注意力机制提取高层次特征,并且使用 D F M DFM DFM进行多层次、多模态的特征融合,增强模型的表现能力。
def __init__(self, img_size=224, patch_size=4, in_chans=256, embed_dims=[64, 128, 320, 512],
num_heads=[1, 2, 5, 8], mlp_ratios=[8, 8, 4, 4], qkv_bias=True, qk_scale=None, drop_rate=0.0,
attn_drop_rate=0., drop_path_rate=0.1, norm_layer=partial(nn.LayerNorm, eps=1e-6), depths=[2, 2, 2, 2],
sr_ratios=[8, 4, 2, 1], num_stages=4, F4=False):
super().__init__()
#定义ResNet模块初步对输入图像特征提取,其中由make_layer()构建每一层ResNet的具体实现
mid_ch = 64
self.inplanes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.layer1 = self.make_layer(64, 3, stride=1, dilation=1)
self.layer2 = self.make_layer(128, 4, stride=2, dilation=1)
self.layer3 = self.make_layer(256, 6, stride=2, dilation=1)
self.layer4 = self.make_layer(512, 3, stride=2, dilation=1)
self.CBkbs = [self.layer1, self.layer2, self.layer3, self.layer4]
#定义DFM模块,用于在用于在不同层级当中融合ResNet和PVT提取的特征
rout_chl = [256, 512, 1024, 2048]
#融合ResNet的当前特征和PVT的支持特征,作为下一ResNet模块的输入(位于主编码器中)
self.cdfm1 = DFM(cur_in_ch=rout_chl[1], sup_in_ch=embed_dims[1], out_ch=rout_chl[1])
self.cdfm2 = DFM(cur_in_ch=rout_chl[2], sup_in_ch=embed_dims[2], out_ch=rout_chl[2])
#融合PVT的当前特征和ResNet的支持特征,作为下一PVT模块的输入(位于辅助编码器中)
self.tdfm1 = DFM(cur_in_ch=embed_dims[1], sup_in_ch=rout_chl[1], out_ch=embed_dims[1])
self.tdfm2 = DFM(cur_in_ch=embed_dims[2], sup_in_ch=rout_chl[2], out_ch=embed_dims[2])
#融合主、辅助编码器的输出,作为解码器的输入之一
self.sumdfm = DFM(cur_in_ch=rout_chl[3], sup_in_ch=embed_dims[3], out_ch=mid_ch)
#主编码器DFM模块列表
self.cdfmx = [self.cdfm1, self.cdfm2]
#辅助编码器DFM模块列表
self.tdfmx = [self.tdfm1, self.tdfm2]
#定义PVT模块
#每个阶段包含的注意力块数目
self.depths = depths
self.F4 = F4
self.num_stages = num_stages
#每个层分配的路径丢弃率(drop path rate)
dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]
cur = 0
#定义每个阶段的PVT模块
for i in range(1, num_stages):
#使用PatchEmbed()将图像分割为patch并嵌入到高维特征向量中.参数列表:
##img_size:输入图像大小
##patch_size:图像patch的大小
##in_chans:shu如图像的通道数
##embed_dim:嵌入维度,将每个patch嵌入到一个embed_dim大小的特征(行)向量中
#设输入图像张量形状为(B,C,H,W),则返回嵌入向量序列(Tensor矩阵)形状为(B,N,embed_dim),其中N是patch的数目(也是嵌入向量的数目)
patch_embed = PatchEmbed(img_size=img_size if i == 0 else img_size // (2 ** (i + 1)),
patch_size=patch_size if i == 0 else 2,
in_chans=in_chans if i == 1 else embed_dims[i - 1],
embed_dim=embed_dims[i])
num_patches = patch_embed.num_patches if i != num_stages - 1 else patch_embed.num_patches + 1
#定义位置嵌入矩阵(可训练参数)
pos_embed = nn.Parameter(torch.zeros(1, num_patches, embed_dims[i]))
#位置嵌入的dropout,防止过拟合
pos_drop = nn.Dropout(p=drop_rate)
#定义当前阶段的多个Transformer模块
block = nn.ModuleList([Block(
dim=embed_dims[i], num_heads=num_heads[i], mlp_ratio=mlp_ratios[i], qkv_bias=qkv_bias,
qk_scale=qk_scale, drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[cur + j],
norm_layer=norm_layer, sr_ratio=sr_ratios[i])
for j in range(depths[i])])
cur += depths[i]
#动态为类定义属性,包括每个阶段的嵌入模块、位置嵌入、位置Dropout和Transformer块
setattr(self, f"patch_embed{i + 1}", patch_embed)
setattr(self, f"pos_embed{i + 1}", pos_embed)
setattr(self, f"pos_drop{i + 1}", pos_drop)
setattr(self, f"block{i + 1}", block)
#使用截断正太分布初始化位置嵌入
trunc_normal_(pos_embed, std=.02)
#对整个模块进行权重初始化(_init_weights在前面有定义)
self.apply(self._init_weights)
def __init__weights(self,m)
:对模型中不同类型的层初始化参数。
def _init_weights(self, m):
#全连接层
if isinstance(m, nn.Linear):
#截断正态分布初始化
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
#归一化层
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
def _get_pos_embed(self, pos_embed, patch_embed, H, W)
:通过插值法调整位置嵌入的尺寸,使其能够与特征图的尺寸一致,便于后续位置嵌入的操作。pos_embed
:需调整尺寸的位置嵌入序列。patch_embed
:图像patch序列。H
、W
:特征图的高度与宽度。
def _get_pos_embed(self, pos_embed, patch_embed, H, W):
#1.执行reshape将位置嵌入形状调整为(1,patch_embed.H,patch_embed.W,-1).其中,patch_embed.H、patch_embed.W表patch嵌入序列的高度、宽度.-1表自动推导出最后的维度(位置嵌入行向量通道数).
#2.执行permute()变换维度,(B,H,W,embed_dim)->(B,embed_dim,H,W)
#3.执行interpolate()进行双线性插值来调整位置嵌入尺寸,(patch_embed.H, patch_embed.W)->(H, W)
#4.执行reshape()将插值后的位置嵌入调整到(1,embed_dim,H*W),即,将特征图的空间维度展平为一个一维的序列
#5.执行permute()变换维度,(1,embed_dim,H*W)->(1,H*W,embed_dim)
return F.interpolate(
pos_embed.reshape(1, patch_embed.H, patch_embed.W, -1).permute(0, 3, 1, 2),
size=(H, W), mode="bilinear").reshape(1, -1, H * W).permute(0, 2, 1)
def make_layer(self, planes, blocks, stride, dilation)
:构建残差网络ResNet的一个层级,通常由多个Bottleneck模块堆叠而成。
def make_layer(self, planes, blocks, stride, dilation):
downsample = None
#若步幅不为1,或输入通道数不等于Bottleneck模块的输出通道数(planes*4),则需下采样
if stride != 1 or self.inplanes != planes*4:
#构建下采样模块
downsample = nn.Sequential(nn.Conv2d(self.inplanes, planes*4, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(planes*4))
#添加Bottleneck模块
layers = [Bottleneck(self.inplanes, planes, stride, downsample, dilation=dilation)]
self.inplanes = planes*4
for _ in range(1, blocks):
layers.append(Bottleneck(self.inplanes, planes, dilation=dilation))
#将所有模块使用nn.Sequential()串联并返回
return nn.Sequential(*layers)
def forward(self,x)
:定义DBI模型编码器的前向传播过程,每个阶段中, R e s N e t ResNet ResNet模块(CBkbs
)和 P V T PVT PVT(tdfmx
和block
)模块分别处理输入特征,并在中间阶段通过融合模块(DFM)进行特征交互,最终产生多级特征。执行流程如下:- 输入阶段:
- (1)输入图像通过卷积(7x7,步长为2,填充为3,卷积核个数为64)+BatchNormlization+ReLU+最大池化。
- (2)输入
ResNet Bokck1
(CBkbs[0]
)得到初始卷积特征图out1
。
- 卷积模块和Transformer模块输入特征的初始化:
- (1)执行
c_out = out1
、t_out = out1
,将ResNet Bokck2
、PVT Block1
的输入特征均初始化为out1
,并将out1
保存到列表couts
中。
- (1)执行
- 循环阶段:依次处理ResNet2+PVT Block1、ResNet3+PVT Block2+DFM、ResNet4+PVT Block3+DFM、DFM,分别对应阶段1、2、3。
- (1)Transformer分支:使用
patch_embed
对上一PVT Block的输出t_out
分块处理得到curt_out
,若是2、3阶段则通过DFM模块融合t_out
(PVT提取的特征)和c_out
(ResNet提取的特征)。之后将位置编码pos_embed
与curt_out
相加,并应用pos_drop
得到更新后的curt_out
。将curt_out
输入到下一PVT Block中,之后重塑curt_out
形状,将其调整为 ( B , C , H , W ) (B,C,H,W) (B,C,H,W)作为下一阶段的t_out
。 - (2)卷积分支:若是2或3阶段,则通过
D
F
M
DFM
DFM模块将卷积特征
c
o
u
t
c_out
cout与
T
r
a
n
s
f
o
r
m
e
r
Transformer
Transformer特征
t
o
u
t
融合
t_out 融合
tout融合,再输入下一ResNet卷积块
C
B
k
b
s
[
i
]
CBkbs[i]
CBkbs[i]进行处理,更新
c
o
u
t
c_out
cout。之后将更新后的
c_out
添加到couts
列表中。
- (1)Transformer分支:使用
- 最终阶段:卷积特征
c_out
和Transformer
特征t_out
最终通过sumdfm
模块进行融合,产生最终的融合特征ct_fuse
,并存储到couts
列表中。
- 输入阶段:
def forward(self, x):
#存储四个ResNet模块输出的特征、最终合并的特征
couts = []
#获取batch_size
B = x.shape[0]
#输入阶段:卷积(7x7,步长为2,填充为3,卷积核个数为64)+BatchNormlization+ReLU+最大池化
out1 = F.relu(self.bn1(self.conv1(x)), inplace=True)
out1 = F.max_pool2d(out1, kernel_size=3, stride=2, padding=1)
#输入ResNet Block1,得到初始卷积特征图out1
out1 = self.CBkbs[0](out1)
c_out = out1
t_out = out1
#将ResNet Bokck1的输出保存到couts列表
couts.append(c_out)
#num_stages=4,循环共执行三次,对应ResNet Block2+PVT Block1、DFM+ResNet Block3+PVT Block2、DFM+ResNet Block4+PVT Block3、DFM(最终融合)
for i in range(1, self.num_stages):
#获取PVT分支
#获取patch_embed模块,用于将特征图分割为固定大小的patch并转化为一个低维向量
patch_embed = getattr(self, f"patch_embed{i + 1}")
#获取位置编码
pos_embed = getattr(self, f"pos_embed{i + 1}")
#获取位置编码后的dropout层
pos_drop = getattr(self, f"pos_drop{i + 1}")
#获取Transformer Block(Self-Attention+MLP模块)
block = getattr(self, f"block{i + 1}")
#当i==2或i==3时(2、3阶段),t_out和c_out 经过self.tdfmx[i-2](DFM)特征融合,即,融合CNN和Transformer模块提取的特征,并将其传入patch_embed.而i==1时(1阶段),直接将t_out(Transformer模块的输出)传入patch_embed,并融合.
#t_out、c_out分别代表Transformer、CNN上一个阶段的输出
#curt_out代表经过patch_embed处理后的输出特征图,(H,W)表示特征图的高度与宽度
curt_out, (H, W) = patch_embed(self.tdfmx[i - 2](t_out, c_out) if i in [2, 3] else t_out)
if i == self.num_stages - 1:
#若是最后阶段,则调整位置编码,将其调整为适应当前特征图的大小(H,W),其中pos_embed移除了向量0维度的值.因为位置编码(pos_embed)通常包含一个特殊的class token的位置编码来作为图像的类别信息.但在特征提取、语义分割任务中更关注特征图中的每个位置,无需图像的整体类别信息,故只保留与特征图位置相关的编码.
pos_embed = self._get_pos_embed(pos_embed[:, 1:], patch_embed, H, W)
else:
#调整位置编码,使其匹配当前特征图的大小(H,W)
pos_embed = self._get_pos_embed(pos_embed, patch_embed, H, W)
#融合位置编码与特征图,并输入dropout层
curt_out = pos_drop(curt_out + pos_embed)
#ResNet分支
#若是2、3阶段,则使用DFM模块融合c_out、t_out(CNN、Transformer提取到的特征)并送入到对应的ResNet Block(i+1)中,否则直接将原始的c_out作为输入
c_out = self.CBkbs[i](self.cdfmx[i - 2](c_out, t_out) if i in [2, 3] else c_out)
#PVT模块对提取的特征进行处理
for blk in block:
curt_out = blk(curt_out, H, W)
#重塑特征图形状,[B, num_patches, embed_dim]->[[B, H, W, embed_dim],即,将一维的patch序列重塑为二维图像特征,并通过permute将其调整为[B,embed_dim,H,W]以适应卷积网络的输入.
t_out = curt_out.reshape(B, H, W, -1).permute(0, 3, 1, 2).contiguous()
#保存ResNet提取的特征
couts.append(c_out)
#最后阶段,使用DFM融合特征并保存到couts列表中并返回
ct_fuse = self.sumdfm(c_out, t_out)
couts.append(ct_fuse)
return couts
AB2019_train.py
Burned_Area_Segment-2019数据集训练代码,与代码复现(二)基本一致。
def get_argparse()
:定义命令行参数。
参数名 | 作用 |
---|---|
–trainset_path | 训练集路径。 |
–testset_path | 测试集路径。 |
–num_classes | 类别数,默认2,仅火点和非火点两种。 |
–dataset | 数据集名称,默认LC8FPS256。 |
–threshold | 火灾像素阈值。 |
–model | 模型名称,默认FPSU2Net。 |
–epochs | 训练轮次,默认80。 |
–total_itrs | 总迭代次数,默认30000。 |
–lr | 学习率,默认0.001。 |
–lr_policy | 学习策略,默认poly,可选poly、step。 |
–step_size | 步长,默认10。 |
–batch_size | 批次大小,默认4。 |
–trainsize | 图像训练尺寸,默认256。 |
–n_cpu | 默认2。 |
–ckpt | 模型、优化器和调度器状态的保存路径,可用于加载检查点,默认None。 |
–loss_type | 损失函数类型,默认bi,可选bi、bce。 |
–gpu_id | gpu编号,默认0。 |
–weight_decay | 默认0.0001。 |
–random_seed | 随机种子,默认1。 |
–val_interval | 检查点保存间隔。 |
def get_dataset(opts)
:返回训练集、验证集的Dataset对象。def validate(opts, model, loader, device, metrics)
:验证模型性能,将模型的预测与真实标签进行比较,并使用指标(metrics
)来评估模型的准确性。opts
:命令行参数对象。model
:需验证的模型。loader
:验证集数据集加载器。device
:计算设备。metrics
:性能指标计算对象。
def validate(opts, model, loader, device, metrics):
#清除之前计算的指标结果,确保验证过程从头开始
metrics.reset()
#验证阶段模型无需更新权重,因此禁止梯度计算
with torch.no_grad():
#遍历验证集的输入图像、标签
for i, (images, labels) in tqdm(enumerate(loader)):
images = images.to(device, dtype=torch.float32)
labels = labels.to(device, dtype=torch.long)
#将图像送入模型,返回多个输出特征图和对应的激活值
s1,s2,s3,s4, s1_sig,s2_sig,s3_sig,s4_sig=model(images)
#s1即为最终特征图,其激活值作为模型输出,并使用squeeze()调整张量形状作为模型预测
outputs=s1_sig
preds=outputs.squeeze()
#将预测矩阵二值化
preds[preds>=opts.threshold]=1
preds[preds<opts.threshold]=0
#outputs是模型在前向传播过程中输出的张量,需将其从计算图中分离.并使用cpu()将其转移到cpu,因为numpy不支持GPU张量.
preds = outputs.detach().cpu().numpy()
#预测结果转为int型,便于与真实标签比较
preds=preds.astype(int)
#将真实标签 labels 转换为 NumPy 数组
targets = labels.cpu().numpy()
#若真实标签只有一个维度(即batch大小为1),使用squeeze()去除多余的维度,确保与预测结果的形状一致
if targets.shape[0]==1:
targets=targets.squeeze()
#更新评估指标
metrics.update(targets, preds)
#获取最终评估结果
score = metrics.get_results()
#返回分数
return score
def main()
:训练、验证模型。
def main():
if not os.path.exists('CHKP'):
utils.mkdir('CHKP')
opts = get_argparser().parse_args()
tb_writer = SummaryWriter()
torch.cuda.empty_cache()
os.environ['CUDA_VISIBLE_DEVICES'] = opts.gpu_id
print("Device: %s" % device)
torch.manual_seed(opts.random_seed)
np.random.seed(opts.random_seed)
random.seed(opts.random_seed)
train_dst, val_dst = get_dataset(opts)
opts.total_itrs=opts.epochs * (len(train_dst) // opts.batch_size)
print('opts:',opts)
train_loader = data.DataLoader(
train_dst, batch_size=opts.batch_size, shuffle=True, num_workers=opts.n_cpu,
drop_last=True)
val_loader = data.DataLoader(
val_dst, batch_size=opts.batch_size, shuffle=True, num_workers=opts.n_cpu)
print("Dataset: %s, Train set: %d, Val set: %d" %
(opts.dataset, len(train_dst), len(val_dst)))
model = eval(opts.model)()
optimizer = torch.optim.Adam(model.parameters(), lr=opts.lr, betas=(0.9, 0.999),
eps=1e-08, weight_decay=opts.weight_decay)
metrics = StreamSegMetrics(opts.num_classes)
if opts.lr_policy == 'poly':
scheduler = utils.PolyLR(optimizer, opts.total_itrs, power=0.9)
elif opts.lr_policy == 'step':
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=opts.step_size, gamma=0.1)
def save_ckpt(path):
torch.save({
"epoch": epoch+1,
"model_state": model.module.state_dict(),
"optimizer_state": optimizer.state_dict(),
"scheduler_state": scheduler.state_dict(),
}, path)
print("Model saved as %s" % path)
cur_epoch=0
if opts.ckpt is not None and os.path.isfile(opts.ckpt):
checkpoint = torch.load(opts.ckpt, map_location=torch.device('cpu'))
model.load_state_dict(checkpoint["model_state"])
model = nn.DataParallel(model)
model=model.to(device)
optimizer.load_state_dict(checkpoint["optimizer_state"])
scheduler.load_state_dict(checkpoint["scheduler_state"])
cur_epoch = checkpoint["epoch"]
print("Model restored from %s" % opts.ckpt)
del checkpoint # free memory
else:
print("[!] Retrain")
model = nn.DataParallel(model)
model=model.to(device)
for epoch in range(cur_epoch,opts.epochs):
model.train()
cur_itrs=0
data_loader = tqdm(train_loader, file=sys.stdout)
running_loss = 0.0
for (images, gts) in data_loader:
cur_itrs += 1
images = images.to(device, dtype=torch.float32)
gts = gts.to(device, dtype=torch.float32)
optimizer.zero_grad()
s1,s2,s3,s4, s1_sig,s2_sig,s3_sig,s4_sig= model(images)
loss1 = CE(s1, gts) + IOU(s1_sig, gts)
loss2 = CE(s2, gts) + IOU(s2_sig, gts)
loss3 = CE(s3, gts) + IOU(s3_sig, gts)
loss4 = CE(s4, gts) + IOU(s4_sig, gts)
total_loss = loss1 + loss2/2 + loss3/4 +loss4/8
running_loss += total_loss.data.item()
total_loss.backward()
optimizer.step()
data_loader.desc = "Epoch {}/{}, loss={:.4f}".format(epoch, opts.epochs, running_loss/cur_itrs)
scheduler.step()
if (epoch+1) % opts.val_interval == 0:
save_ckpt('CHKP/latest_{}_{}.pth'.format(opts.model, opts.dataset))
print("validation...")
model.eval()
#传入验证集加载器、输出模型评估分数
val_score = validate(
opts=opts, model=model, loader=val_loader, device=device, metrics=metrics)
#打印验证分数
print('val_score:',val_score)
tags = ["train_loss", "learning_rate","Mean_Acc","Mean_IoU","BA_IoU"]
tb_writer.add_scalar(tags[0], (running_loss/cur_itrs), epoch)
tb_writer.add_scalar(tags[1], optimizer.param_groups[0]["lr"], epoch)
tb_writer.add_scalar(tags[2], val_score['Mean Acc'], epoch)
tb_writer.add_scalar(tags[3], val_score['Mean IoU'], epoch)
tb_writer.add_scalar(tags[4], val_score['Class IoU'][1], epoch)