文章目录
- 简述
- 模型结构
- 模型参数、优化器、损失函数
- 参数初始化
- 优化器
- 损失函数
- 模型训练、测试集预测、模型保存、日志记录
- 训练
- 测试集测试
- 模型保存
- 模型训练完整代码
- tensorboard训练可视化结果
- train_loss
- 测试准确率
- 测试集loss
- 模型应用
- 模型独立应用代码`api.py`
- 预测结果
简述
使用pytorch实现一个用于训练CIFAR10的模型,在训练过程中使用CIFAR10的测试数据集记录准确度。训练结束后,搜集一些图片,单独实现对训练后模型的应用代码。
另外会在文中尽量给出各种用法的官方文档链接。
代码分为:
- 模型训练代码
train.py
,包含数据加载、模型封装、训练、tensorboard记录、模型保存等; - 模型应用代码
api.py
,包含对训练所保存模型的加载、数据准备、结果预测等;
注意:
本文目的是使用pytorch来构建一个结构完善的模型,体现出pytorch的各种功能函数、模型设计理念,来学习深度学习,而非训练一个高精度的分类识别模型。
不足:
- 参数初始化或许可以考虑kaiming(因为用的是ReLU);
- 可以加上k折交叉验证;
- 训练时可以把batch_size的图片加入tensorboard,文中batch_size=256,若每个batch_size都加的话数据太多了,所以文中是每逢整百的训练次数时记录一下该批次的loss值,加图片的话可以在该代码处添加;
模型结构
来源:https://www.researchgate.net/profile/Yiren-Zhou-6/publication/312170477/figure/fig1/AS:448817725218816@1484017892071/Structure-of-LeNet-5.png
在上述图片基础上增加了nn.BatchNorm2d
、nn.ReLU
以及nn.Dropout
,最终结构如下:
layers = nn.Sequential(
# shape(3,32,32) -> shape(32,32,32)
nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
# shape(32,32,32) -> shape(32,16,16)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(32,16,16) -> shape(32,16,16)
nn.Conv2d(32, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
# shape(32,16,16) -> shape(32,8,8)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(32,8,8) -> shape(64,8,8)
nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(64),
nn.ReLU(),
# shape(64, 8, 8) -> shape(64,4,4)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(64,4,4) -> shape(64 * 4 * 4,)
nn.Flatten(),
nn.Linear(64 * 4 * 4, 64),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(64, 10)
)
可以看看使用tensorboard的writer.add_graph
函数实现的模型结构图:
模型参数、优化器、损失函数
参数初始化
模型参数使用nn.init.normal_
作初始化,但模型中存在ReLU
,应考虑使用kaiming He
初始化。
apply
函数:Module — PyTorch 2.4 documentation
参数初始化函数:torch.nn.init — PyTorch 2.4 documentation
def init_normal(m):
# 考虑使用kaiming
if m is nn.Linear:
nn.init.normal_(m.weight, mean=0, std=0.01)
nn.init.zeros_(m.bias)
# 定义模型、数据初始化
net = CIFAR10Net()
net.apply(init_normal)
优化器
优化器使用Adam,即Momentum
和AdaGrad
的结合。
文档:Adam — PyTorch 2.4 documentation
# 优化器
weight_decay = 0.0001
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate, weight_decay=weight_decay)
损失函数
分类任务,自然是用交叉熵损失函数了。
loss_fn = nn.CrossEntropyLoss()
模型训练、测试集预测、模型保存、日志记录
注意,代码前面部分代码有定义
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
训练
net.train()
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
outputs = net(images)
loss = loss_fn(outputs, labels)
# 优化器处理
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_train_step += 1
if total_train_step % 100 == 0:
print(f'Epoch: {epoch + 1}, 累计训练次数: {total_train_step}, 本次loss: {loss.item():.4f}')
writer.add_scalar('train_loss', loss.item(), total_train_step)
current_time = time.time()
writer.add_scalar('train_time', current_time-start_time, total_train_step)
测试集测试
net.eval()
total_test_loss = 0
total_test_acc = 0 # 整个测试集正确个数
with torch.no_grad():
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = net(images)
loss = loss_fn(outputs, labels)
total_test_loss += loss.item()
accuracy = (outputs.argmax(1) == labels).sum()
total_test_acc += accuracy
print(f'整个测试集loss值和: {total_test_loss:.4f}, batch_size: {batch_size}')
print(f'整个测试集正确率: {(total_test_acc / test_data_size) * 100:.4f}%')
writer.add_scalar('test_loss', total_test_loss, epoch + 1)
writer.add_scalar('test_acc', (total_test_acc / test_data_size) * 100, epoch + 1)
模型保存
torch.save(net.state_dict(),
'./save/epoch_{}_params_acc_{}.pth'.format(epoch+1, (total_test_acc / test_data_size)))
模型训练完整代码
train.py
import torch
import torchvision
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from torch.utils import data
from torch import nn
import time
from datetime import datetime
def load_data_CIFAR10(resize=None):
"""
下载 CIFAR10 数据集,然后将其加载到内存中
transforms.ToTensor() 转换为形状为C x H x W的FloatTensor,并且会将像素值从[0, 255]缩放到[0.0, 1.0]
""" trans = [transforms.ToTensor()]
if resize:
trans.insert(0, transforms.Resize(resize))
trans = transforms.Compose(trans)
cifar_train = torchvision.datasets.CIFAR10(root="../data", train=True, transform=trans, download=False)
cifar_test = torchvision.datasets.CIFAR10(root="../data", train=False, transform=trans, download=False)
return cifar_train, cifar_test
class CIFAR10Net(torch.nn.Module):
def __init__(self):
super(CIFAR10Net, self).__init__()
layers = nn.Sequential(
# shape(3,32,32) -> shape(32,32,32)
nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
# shape(32,32,32) -> shape(32,16,16)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(32,16,16) -> shape(32,16,16)
nn.Conv2d(32, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
# shape(32,16,16) -> shape(32,8,8)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(32,8,8) -> shape(64,8,8)
nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(64),
nn.ReLU(),
# shape(64, 8, 8) -> shape(64,4,4)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(64,4,4) -> shape(64 * 4 * 4,)
nn.Flatten(),
nn.Linear(64 * 4 * 4, 64),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(64, 10)
)
self.layers = layers
def forward(self, x):
return self.layers(x)
def init_normal(m):
# 考虑使用kaiming
if m is nn.Linear:
nn.init.normal_(m.weight, mean=0, std=0.01)
nn.init.zeros_(m.bias)
if __name__ == '__main__':
# 超参数
epochs = 6
batch_size = 256
learning_rate = 0.01
num_workers = 0
weight_decay = 0
# 数据记录
total_train_step = 0
total_test_step = 0
train_loss_list = list()
test_loss_list = list()
train_acc_list = list()
test_acc_list = list()
# 准备数据集
train_data, test_data = load_data_CIFAR10()
train_loader = data.DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_loader = data.DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=num_workers)
train_data_size = len(train_data)
test_data_size = len(test_data)
print(f'训练测试集长度: {train_data_size}, 测试数据集长度: {test_data_size}, batch_size: {batch_size}\n')
# device = torch.device("cpu")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'\ndevice: {device}')
# 定义模型、数据初始化
net = CIFAR10Net().to(device)
# net.apply(init_normal)
# 损失函数
loss_fn = nn.CrossEntropyLoss().to(device)
# 优化器
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate, weight_decay=weight_decay)
# now_time = datetime.now()
# now_time = now_time.strftime("%Y%m%d-%H%M%S") # tensorboard writer = SummaryWriter('./train_logs')
# 随便定义个输入, 好使用add_graph
tmp = torch.rand((batch_size, 3, 32, 32)).to(device)
writer.add_graph(net, tmp)
start_time = time.time()
for epoch in range(epochs):
print('------------Epoch {}/{}'.format(epoch + 1, epochs))
# 训练
net.train()
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
outputs = net(images)
loss = loss_fn(outputs, labels)
# 优化器处理
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_train_step += 1
if total_train_step % 100 == 0:
print(f'Epoch: {epoch + 1}, 累计训练次数: {total_train_step}, 本次loss: {loss.item():.4f}')
writer.add_scalar('train_loss', loss.item(), total_train_step)
current_time = time.time()
writer.add_scalar('train_time', current_time-start_time, total_train_step)
# 测试
net.eval()
total_test_loss = 0
total_test_acc = 0 # 整个测试集正确个数
with torch.no_grad():
for images, labels in test_loader:
images, labels = images.to(device), labels.to(device)
outputs = net(images)
loss = loss_fn(outputs, labels)
total_test_loss += loss.item()
accuracy = (outputs.argmax(1) == labels).sum()
total_test_acc += accuracy
print(f'整个测试集loss值和: {total_test_loss:.4f}, batch_size: {batch_size}')
print(f'整个测试集正确率: {(total_test_acc / test_data_size) * 100:.4f}%')
writer.add_scalar('test_loss', total_test_loss, epoch + 1)
writer.add_scalar('test_acc', (total_test_acc / test_data_size) * 100, epoch + 1)
torch.save(net.state_dict(),
'./save/epoch_{}_params_acc_{}.pth'.format(epoch+1, (total_test_acc / test_data_size)))
writer.close()
tensorboard训练可视化结果
train_loss
纵轴为每个batch_size损失值,横轴为训练次数,其中batch_size = 256。
测试准确率
纵轴为整个CIFAR10测试集的准确率(%),横轴为epoch,其中epochs=50。
测试集loss
纵轴为CIFAR10整个测试集的每个batch_size的loss之和,batch_size = 256。横轴为epoch,其中epochs=50。
模型应用
模型训练过程中,每个epoch保存一次模型。
torch.save(net.state_dict(), './save/epoch_{}_params_acc_{}.pth'.format(epoch+1, (total_test_acc / test_data_size)))
这里实现一个,将保存的模型加载,并对自行搜集的图片进行预测。
项目结构:
-
./autodl_save/cuda_params_acc_75.pth
:训练时保存的模型参数文件; -
./test_images
:网上搜集的卡车、狗、飞机、船图片,大小不一,保存时未作处理,如下:
-
api.py
:实现图片的预处理(裁剪、ToTensor
、封装为数据集等)、模型加载、图片推理等;
模型独立应用代码api.py
import os
import torch
import torchvision
from PIL import Image
from torch import nn
class CIFAR10Net(torch.nn.Module):
def __init__(self):
super(CIFAR10Net, self).__init__()
layers = nn.Sequential(
# shape(3,32,32) -> shape(32,32,32)
nn.Conv2d(3, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
# shape(32,32,32) -> shape(32,16,16)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(32,16,16) -> shape(32,16,16)
nn.Conv2d(32, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
# shape(32,16,16) -> shape(32,8,8)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(32,8,8) -> shape(64,8,8)
nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(64),
nn.ReLU(),
# shape(64, 8, 8) -> shape(64,4,4)
nn.MaxPool2d(kernel_size=2, stride=2),
# shape(64,4,4) -> shape(64 * 4 * 4,)
nn.Flatten(),
nn.Linear(64 * 4 * 4, 64),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(64, 10)
)
self.layers = layers
def forward(self, x):
return self.layers(x)
def build_data(images_dir):
image_list = os.listdir(images_dir)
image_paths = []
for image in image_list:
image_paths.append(os.path.join(images_dir, image))
transform = torchvision.transforms.Compose([torchvision.transforms.Resize((32, 32)),
torchvision.transforms.ToTensor()])
# 存储转换后的张量
images_tensor = []
for image_path in image_paths:
try:
# 加载图像并转换为 RGB(如果它已经是 RGB,这步是多余的)
image_pil = Image.open(image_path).convert('RGB')
# 应用转换并添加到列表中
images_tensor.append(transform(image_pil))
except IOError:
print(f"Cannot open {image_path}. Skipping...")
# 转换列表为单个张量,如果需要的话
# 注意:这里假设所有图像都被成功加载和转换
if images_tensor:
# 使用 torch.stack 来合并张量列表
images_tensor = torch.stack(images_tensor)
else:
# 如果没有图像,返回一个空的张量或根据需要处理
images_tensor = torch.empty(0, 3, 32, 32)
return images_tensor, image_list
def predict(state_dict_path, image):
net = CIFAR10Net()
net.load_state_dict(torch.load(state_dict_path))
net.cuda()
with torch.no_grad():
image = image.cuda()
output = net(image)
return output
if __name__ == '__main__':
images, labels = build_data("./test_images")
outputs = predict("./autodl_save/cuda_params_acc_75.pth", images)
# 选取结果(即得分最大的下标)
res = outputs.argmax(dim=1)
kinds = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
for i in range(len(res)):
classes_idx = res[i]
print(f'文件(正确标签): {labels[i]}, 预测结果: {classes_idx}, {kinds[classes_idx]}\n')
预测结果
7个识别出4个。
注意这个索引和标签的对应关系可以从数据集中查看。