ResNet34是由16个残差块和一个全局平局池化层和一个全连接层组成,即32个卷积层+1个pooling层+1和fc层。
训练的数据集是cifar10数据集,训练次数5,损失函数为CrossEntropyLoss(),optimizer = torch.optim.SGD。
1.先定义残差块,每个块中有两个3×3卷积
class Residual(nn.Module):
def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0)
self.bn3 = nn.BatchNorm2d(out_channels)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, x):
y = F.relu(self.bn1(self.conv1(x)))
y = self.bn2(self.conv2(y))
if self.conv3:
x = self.bn3(self.conv3(x))
return F.relu(y + x)
上面代码中使用1×1卷积的作用是修改输入数据x的通道数,使得x与y的形状相同。
2.定义由上面小残差块组成的大块
下图是resnet原论文中34层网络图
这一部分目的是构建由相同颜色的resnet块组成的大块,每个大块中小块个数为3,4,6,3,代码如下
def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
if first_block == True:
assert in_channels == out_channels
blk = []
for i in range(num_residuals):
if i==0 and not first_block:
blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
else:
blk.append(Residual(out_channels, out_channels))
return nn.Sequential(*blk)
3.定义网络
下面是数据进入残差结构的卷积、BN等操作,类似于VGG16。
# 定义网络
net = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
然后向网络net中添加resnet模块
net.add_module('resnet_block1', resnet_block(64, 64, 3, first_block=True))
net.add_module('resnet_block2', resnet_block(64, 128, 4))
net.add_module('resnet_block3', resnet_block(128, 256, 6))
net.add_module('resnet_block4', resnet_block(256, 512, 3))
下面是池化层和全连接层,池化层将7×7特征图转成1×1,全连接层将(N,C,H,W)的输入转成(N,CHW),在对这个数据应用仿射线性变换,如下
net.add_module('avg', GlobalAvgPool2d())
net.add_module('fc', nn.Sequential(FlattenLayer(), nn.Linear(512, 10)))
4.加载数据集
这部分对数据集的处理包括,原图根据短边在[256,480]之间缩放,随即旋转裁剪固定224大小,再将数据转成Tensor格式。
再用torchvision读取数据dataloader加载数据
# 加载数据集
def load_data_cifar10(batch_size, root='~/Datasets/CIFAR10'):
image_transform = torchvision.transforms.Compose([
# Step 1: 随机调整短边大小到 [256, 480]
torchvision.transforms.RandomResizedCrop(224, scale=(256 / 480, 1.0), ratio=(0.75, 1.33)),
# Step 2: 随机水平翻转
torchvision.transforms.RandomHorizontalFlip(),
# Step 3: 转换为 Tensor 格式
torchvision.transforms.ToTensor(),
])
cifar_train = torchvision.datasets.CIFAR10(root=root, train=True, transform=image_transform, download=True)
cifar_test = torchvision.datasets.CIFAR10(root=root, train=False, transform=image_transform, download=True)
if sys.platform.startswith('win'):
num_workers = 0 # 0表示不用额外的进程来加速读取数据
else:
num_workers = 4
train_iter = torch.utils.data.DataLoader(cifar_train, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_iter = torch.utils.data.DataLoader(cifar_test, batch_size=batch_size, shuffle=True, num_workers=num_workers)
return train_iter, test_iter
5.进行模型的训练与评估
def evaluate_accuracy(data_iter, net, device=None):
if device is None and isinstance(net, torch.nn.Module):
# 如果没指定device就使用net的device
device = list(net.parameters())[0].device
acc_sum, n = 0.0, 0
with torch.no_grad():
for X, y in data_iter:
if isinstance(net, torch.nn.Module):
net.eval() # 评估模式, 这会关闭dropout
acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
net.train() # 改回训练模式
else: # 自定义的模型, 3.13节之后不会用到, 不考虑GPU
if('is_training' in net.__code__.co_varnames): # 如果有is_training这个参数
# 将is_training设置成False
acc_sum += (net(X, is_training=False).argmax(dim=1) == y).float().sum().item()
else:
acc_sum += (net(X).argmax(dim=1) == y).float().sum().item()
n += y.shape[0]
return acc_sum / n
# 训练
def train_ch5(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs):
net = net.to(device)
print("training on ", device)
loss = torch.nn.CrossEntropyLoss()
for epoch in range(num_epochs):
train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time()
for X, y in train_iter:
X = X.to(device)
y = y.to(device)
y_hat = net(X)
l = loss(y_hat, y)
optimizer.zero_grad()
l.backward()
optimizer.step()
train_l_sum += l.cpu().item()
train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
n += y.shape[0]
batch_count += 1
test_acc = evaluate_accuracy(test_iter, net)
print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
% (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
训练结果
完整代码
import torch
from torch import nn
import torchvision
import sys
import torch.nn.functional as F
import time
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 定义残差块,每个块中有2个3×3卷积
class Residual(nn.Module):
def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0)
self.bn3 = nn.BatchNorm2d(out_channels)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, x):
y = F.relu(self.bn1(self.conv1(x)))
y = self.bn2(self.conv2(y))
if self.conv3:
x = self.bn3(self.conv3(x))
return F.relu(y + x)
# 定义网络
net = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
if first_block == True:
assert in_channels == out_channels
blk = []
for i in range(num_residuals):
if i==0 and not first_block:
blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
else:
blk.append(Residual(out_channels, out_channels))
return nn.Sequential(*blk)
class GlobalAvgPool2d(nn.Module):
def __init__(self) -> None:
super(GlobalAvgPool2d, self).__init__()
def forward(self, x):
return F.avg_pool2d(x, kernel_size=x.size()[2:])
class FlattenLayer(nn.Module):
def __init__(self) -> None:
super(FlattenLayer, self).__init__()
def forward(self, x):
return x.view(x.shape[0], -1)
net.add_module('resnet_block1', resnet_block(64, 64, 3, first_block=True))
net.add_module('resnet_block2', resnet_block(64, 128, 4))
net.add_module('resnet_block3', resnet_block(128, 256, 6))
net.add_module('resnet_block4', resnet_block(256, 512, 3))
net.add_module('avg', GlobalAvgPool2d())
net.add_module('fc', nn.Sequential(FlattenLayer(), nn.Linear(512, 10)))
# 加载数据集
def load_data_cifar10(batch_size, root='~/Datasets/CIFAR10'):
image_transform = torchvision.transforms.Compose([
# Step 1: 随机调整短边大小到 [256, 480]
torchvision.transforms.RandomResizedCrop(224, scale=(256 / 480, 1.0), ratio=(0.75, 1.33)),
# Step 2: 随机水平翻转
torchvision.transforms.RandomHorizontalFlip(),
# Step 3: 转换为 Tensor 格式
torchvision.transforms.ToTensor(),
])
cifar_train = torchvision.datasets.CIFAR10(root=root, train=True, transform=image_transform, download=True)
cifar_test = torchvision.datasets.CIFAR10(root=root, train=False, transform=image_transform, download=True)
if sys.platform.startswith('win'):
num_workers = 0 # 0表示不用额外的进程来加速读取数据
else:
num_workers = 4
train_iter = torch.utils.data.DataLoader(cifar_train, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_iter = torch.utils.data.DataLoader(cifar_test, batch_size=batch_size, shuffle=True, num_workers=num_workers)
return train_iter, test_iter
def evaluate_accuracy(data_iter, net, device=None):
if device is None and isinstance(net, torch.nn.Module):
# 如果没指定device就使用net的device
device = list(net.parameters())[0].device
acc_sum, n = 0.0, 0
with torch.no_grad():
for X, y in data_iter:
if isinstance(net, torch.nn.Module):
net.eval() # 评估模式, 这会关闭dropout
acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
net.train() # 改回训练模式
else: # 自定义的模型, 3.13节之后不会用到, 不考虑GPU
if('is_training' in net.__code__.co_varnames): # 如果有is_training这个参数
# 将is_training设置成False
acc_sum += (net(X, is_training=False).argmax(dim=1) == y).float().sum().item()
else:
acc_sum += (net(X).argmax(dim=1) == y).float().sum().item()
n += y.shape[0]
return acc_sum / n
# 训练
def train_ch5(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs):
net = net.to(device)
print("training on ", device)
loss = torch.nn.CrossEntropyLoss()
for epoch in range(num_epochs):
train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time()
for X, y in train_iter:
X = X.to(device)
y = y.to(device)
y_hat = net(X)
l = loss(y_hat, y)
optimizer.zero_grad()
l.backward()
optimizer.step()
train_l_sum += l.cpu().item()
train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
n += y.shape[0]
batch_count += 1
test_acc = evaluate_accuracy(test_iter, net)
print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
% (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
batch_size = 64
# 如出现“out of memory”的报错信息,可减小batch_size或resize
train_iter, test_iter = load_data_cifar10(batch_size)
lr, num_epochs = 0.01, 5
optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=0.9, weight_decay=0.0001)
train_ch5(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs)