本文基于李沐老师的 实战 Kaggle 比赛:图像分类 (CIFAR-10)
文章目录
- 数据格式
- 数据加载
- 标签转化
- 划分数据集
- 查看数据
- 搭建网络
- 主训练函数
- 网络训练
数据格式
CIFAR-10 下载地址:https://www.kaggle.com/competitions/cifar-10/data
下拉到最下面有一个 Download All 选线,即可下载所有的文件,下载完成之后,完成解压。解压出来的东西如下:
1、test 文件夹中存放的为验证集的数据,比较多,解压可能需要消耗一点时间。
2、train 文件夹中存放的为训练集的数据。
3、sampleSubmission.csv 为你网络训练完后,测得验证机图片对应的标号填入的文件。在 kaggle 的比赛提交的结果就是这个。
4、trainLabels.csv 为训练集图片和其对应的标号对应关系。
数据没有为我们划分训练集和验证集,所以需要我们自己在 train 中划分出部分数据作为训练数据集。
数据加载
mport os
import math
import pandas as pd
import numpy as np
import torch
from torch import nn
import torchvision
from torchvision import transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
class Cifar10Dataset(Dataset):
def __init__(self, data_info, file_path, class_to_num, mode='train', transforms=transforms.ToTensor()):
self.file_path = file_path
self.mode = mode
self.data_info = data_info
self.data_len = len(self.data_info.index)
self.transforms = transforms
def __len__(self):
# 返回当前数据集总的长度
return self.data_len
def __getitem__(self, index):
# 根据输入的索引获取到当前图片的路径,并打开图片
img_path = ""
label = -1
if self.mode == "train" or self.mode == "valid":
# 读取训练集图片的路径
img_path = self.file_path + "/train/" + str(self.data_info['id'][index]) + ".png"
# 读取对应的标签,并将其转换为数字
str_label = self.data_info['label'][index]
label = class_to_num[str_label]
else:
img_path = self.file_path + "/test/" + str(self.data_info['id'][index]) + ".png"
img_orin = Image.open(img_path)
# 对图像做增强处理
img = self.transforms(img_orin)
return img, label
一般情况下我们需要针对训练集和验证集创建不同的 Dataset 加载方式,也可以使用逻辑单元来实现一个 Dataset 公用。
1、继承 Dataset
的作用:是 pytorch 中数据加载的默认方式类,我们继承它,实现其中的一些类方法之后,就可以使用 pytorch 自带的数据加载器的方法,在后面根据 batch_size 和 collate_fn 在 DataLoader 中加载我们的数据。
2、实现 __len__ (self)
函数的作用:在后面我们使用 pytorch 中 DataLoader 来创建数据迭代器的时候,需要获取训练集呀、验证集呀和测试集合的长度,这样我们才可以根据 batch_size 来划分数据。
3、实现 __getitem__ (self, index)
函数的作用:输入一个索引值, 我们就可以获得对应索引的训练数据和对应的标号,主要是为了可以下面这样获得数据:Cifar10Dataset(index)
,在其中我们可以自定义我们自己的数据处理方式,需要注意的是,我们是根据一个 index 来获取 一个训练数据和一个对应的标号, 所以返回值为 img, label,在后面我们获取 batch_size 的大小的数据的时候,其中数据的存放方式为: ( img, label), ( img, label), .....( img, label)
,然而我们训练的时候 batch_size 的输入应该为 ( img, img, ..., img), ( lable, label, ..., label)
,所以单纯的依靠 __getitem__ (self, index)
的功能是不足的,依靠的是下面要将的 collate_fn
函数。
标签转化
# 定义文件的路径
root_path = "../data/cifar-10"
data_info = pd.read_csv(os.path.join(root_path, "trainLabels.csv"))
# 制造类别:字符串和int数值的转换
# 把label文件去重,排个序
str_labels = sorted(list(set(data_info['label'])))
n_classes = len(str_labels)
class_to_num = dict(zip(str_labels, range(n_classes)))
# 数字转成对应label,方便最后预测的时候使用
num_to_class = {v : k for k, v in class_to_num.items()}
print(class_to_num, "\n", num_to_class)
在 trainLabels.csv 中存放的标号为字符串,在训练的时候,我们一般使用为数字,这样就需要手动的转化。
1、class_to_num :存放的为从 string -> int 的标号的转化过程,在制造训练和测试迭代器的时候使用。
2、num_to_class :存放的为从 int -> string 的标号转化过程,在制造验证数据的最后结果和绘图展示的时候会用到。
划分数据集
# 在train数据及中划分出验证数据集
valid_ratio = 0.1
train_data_info = data_info.iloc[ : int(len(data_info.index) * (1 - valid_ratio)), :]
valid_data_info = data_info.iloc[int(len(data_info.index) * (1 - valid_ratio)):, :]
valid_data_info.index -= int(len(data_info.index) * (1 - valid_ratio))
#制造测试集的数据信息
test_data_info = None
test_csv_path = os.path.join(root_path,'sampleSubmission.csv')
test_data_info = pd.read_csv(test_csv_path)
#定义train、valid、test数据增强手段
train_data_transforms = transforms.Compose([
# 图片很小,放大一下有利于后面的操作
transforms.Resize(40),
# 随机裁剪出⼀个⾼度和宽度均为40像素的正⽅形图像,
# ⽣成⼀个⾯积为原始图像⾯积0.64到1倍的⼩正⽅形,
# 然后将其缩放为⾼度和宽度均为32像素的正⽅形
transforms.RandomResizedCrop(32, scale=(0.64, 1.0), ratio=(1.0, 1.0)),
# 水平翻转
torchvision.transforms.RandomHorizontalFlip(),
# 变成张量
transforms.ToTensor(),
# 标准化图像的每个通道,数据来源于imagenet图片中的均值和方差
torchvision.transforms.Normalize([0.4914, 0.4822, 0.4465],[0.2023, 0.1994, 0.2010])
])
test_data_transforms = transforms.Compose([
# 变成张量
transforms.ToTensor(),
# 标准化图像的每个通道
torchvision.transforms.Normalize([0.4914, 0.4822, 0.4465],[0.2023, 0.1994, 0.2010])
])
#分别创造train、valid、test 数据加载器
train_dataset = Cifar10Dataset(train_data_info, root_path, class_to_num, mode="train", transforms=train_data_transforms)
valid_dataset = Cifar10Dataset(valid_data_info, root_path, class_to_num, mode="valid", transforms=test_data_transforms)
test_dataset = Cifar10Dataset(test_data_info, root_path, class_to_num, mode="test", transforms=test_data_transforms)
#创建数据迭代器
batch_size = 256
num_workers = 0
train_iter = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True)
valid_iter = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True)
test_iter = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True)
1、valid_ratio: 为我们打算从原始的训练数据中划分一部分作为验证机的比列,一般训练数据集和验证的比例为 1:9,也可以更改,随情况而定。
2、test_data_info:在我们训练完成自己的网络之后,我们需要使用验证集的数据来验证我们的网络,也可以才用 batch_size 的方式来加快验证,所以这里也创建了一个 test_data_info。
3、train_data_transforms 和 test_data_transforms :分别对应着训练集的数据增强手段和验证、测试集的数据增强手段。
为什么训练集的数据增强手段要多余验证集的数据增强手段?
因为,一般训练的时候,我们希望网络更具有鲁棒性,减少过拟合,采用多种数据增强手段,可以在不改变数据总体分布情况的条件下,增加训练数据的泛化性,使得网络能够真的提取关键信息。
在验证的时候,我们一般仅仅需要网络获得当前数据的标号,所以一般情况下是不做过多的数据增强手段的。测试的时候也是相同的原理。
4、DataLoader 的作用:根据输入的 batch_size 的大小,从 dataset 批次性的创建迭代器,shuffle 是读取数据的时候,是否随机选取读取的 index,num_workers 是数据加载的时候所开启的线程,一般情况下读取数据的速度需要满足网络训练的速度,drop_last 是最后一个 batch_size 不够的情况下,丢弃掉。
其实其中还可以输入一个 collate_fn 的参数,此参数的作用,是将 ( img, label), ( img, label), .....( img, label)
换成网络可接受的输入形状:( img, img, ..., img), ( lable, label, ..., label)
,那为什么我们没有写自己的 collate_fn 函数呢,因为针对图片分类的问题, label 为一个维度,pytorch 官方默认的 collate_fn 函数可以帮我们处理,但是针对目标检测的数据,label 通常由 anchors 生成,造成了 anchor 是多维的,这个时候就需要我们在其中传递我们自己写的 collate_fn 函数。
查看数据
# 查看创建的迭代器是否可以正确的读取数据
def im_convert(tensor):
image = tensor.to("cpu").clone().detach()
image = image.numpy().squeeze()
image = image.transpose(1,2,0)
image = image.clip(0, 1)
return image
# 创建画布
# fig = plt.figure(figsize=(20, 12))
# trainDataIter = iter(train_iter)
# validDataIter = iter(valid_iter)
# testDataIter = iter(test_iter)
# img, label = trainDataIter.next()
# # 显示网络真正加载的图片
# for i in range(8):
# ax = fig.add_subplot(4,4,i+1,xticks=[],yticks=[])
# ax.set_title(num_to_class[int(label[i])])
# plt.imshow(im_convert(img[i]))
# # 显示数据增强之前的图片
# for i in range(8, 16):
# ax = fig.add_subplot(4,4,i+1,xticks=[],yticks=[])
# ax.set_title(num_to_class[int(label[i - 8])])
# img = plt.imread(root_path + "/train/" + str(train_data_info['id'][i - 8]) + ".png")
# plt.imshow(img)
# plt.show()
# img, label = validDataIter.next()
# # 显示网络真正加载的图片
# for i in range(8):
# ax = fig.add_subplot(4,4,i+1,xticks=[],yticks=[])
# ax.set_title(num_to_class[int(label[i])])
# plt.imshow(im_convert(img[i]))
# # 显示数据增强之前的图片
# for i in range(8, 16):
# ax = fig.add_subplot(4,4,i+1,xticks=[],yticks=[])
# ax.set_title(num_to_class[int(label[i - 8])])
# img = plt.imread(root_path + "/train/" + str(valid_data_info['id'][i - 8]) + ".png")
# plt.imshow(img)
# plt.show()
# # 测试验证集
# img, label = testDataIter.next()
# # 显示网络真正加载的图片
# for i in range(8):
# ax = fig.add_subplot(4,4,i+1,xticks=[],yticks=[])
# plt.imshow(im_convert(img[i]))
# # 显示数据增强之前的图片
# for i in range(8, 16):
# ax = fig.add_subplot(4,4,i+1,xticks=[],yticks=[])
# img = plt.imread(root_path + "/test/" + str(test_data_info['id'][i - 8]) + ".png")
# plt.imshow(img)
# plt.show()
从创建的迭代器中,拿出数据增强前后的图片现实,查看其中的不同,也是查看数据增强手段是否正确,trainDataIter 等是否编码正确。
搭建网络
# 下面开始定义自己的网络,
def get_resnet(num_classes, num_block=18, is_pretrained=False):
net = None
if num_block == 18:
net = models.resnet18(pretrained=is_pretrained)
elif num_block == 34:
net = models.resnet34(pretrained=is_pretrained)
elif num_block == 50:
net = models.resnet50(pretrained=is_pretrained)
elif num_block == 101:
net = models.resnet101(pretrained=is_pretrained)
elif num_block == 152:
net = models.resnet152(pretrained=is_pretrained)
else:
raise "No model fit you num_blocks"
net.fc = nn.Sequential( # 替换最后一层
nn.Linear(net.fc.in_features, num_classes)
)
return net
这里我们没有自定义网络,而是从预训练模型中加载了不同的 resnet 网络来当作训练网络。其中
net.fc = nn.Sequential( # 替换最后一层
nn.Linear(net.fc.in_features, num_classes)
)
因为原始的 resnet 是做 1000 类别的分类,我们这里只需要做 10 类的分类,所以我们需要将最后一层换掉,重新设置需要分类的分类数目。
主训练函数
import datetime
# 定义自己的主训练函数
def train_Cifar10(net, device, train_iter, valid_iter, num_epochs, learning_rate, weight_decay, lr_period, lr_decay):
print("training on: ", device)
# 定义自己的优化器
optm = torch.optim.SGD(net.parameters(), lr=learning_rate, weight_decay=weight_decay, momentum=0.9)
# 定义自己学习率的下降公式, 每经过lr_period个epoch,学习率lr=lr*lr_decay
scheduler = torch.optim.lr_scheduler.StepLR(optm, lr_period, lr_decay)
# 定义自己的损失函数
loss = nn.CrossEntropyLoss()
# 开始 epochs 的训练
for epoch in range(num_epochs):
train_loss = []
train_acc = []
valid_loss = []
valid_acc = []
# 让网络进入训练模式,防止后面在验证集上进入评价模式
net.train()
pmgressbar_train = tqdm(train_iter, desc=f'Train epoch {epoch + 1 } / {num_epochs}', postfix=dict, mininterval=0.3)
for imgs, labels in pmgressbar_train:
with torch.no_grad():
# 数据放到 gpu 设备上
imgs = imgs.to(device)
labels = labels.to(device)
# 送入网络获得输出
preds = net(imgs)
# 优化器梯度清零
optm.zero_grad()
# 计算损失
l = loss(preds, labels)
# 反向传播
l.backward()
# 使用优化器更新权重
optm.step()
# 计算损失和计算准确率
acc = (preds.argmax(dim=-1) == labels).float().mean().item()
train_loss.append(l.item())
train_acc.append(acc)
train_loss_mean = sum(train_loss) / len(train_loss)
pmgressbar_train.set_postfix(**{'train_loss' : train_loss_mean,
'train_acc' : acc})
pmgressbar_train.update()
pmgressbar_train.close()
# 每个epoch训练完,更新一下学习率
scheduler.step()
train_loss_mean = sum(train_loss) / len(train_loss)
train_acc_mean = sum(train_acc) / len(train_acc)
print(f'the {epoch+1} epoch, loss = {train_loss_mean:.5f}, train_acc = {train_acc_mean:.5f}')
if ((epoch % 5) == 0 or epoch == (num_epochs - 1)):
# 网络进入验证模式
net.eval()
pmgressbar_valid = tqdm(valid_iter, desc=f'Test epoch {epoch + 1 } / {num_epochs}', postfix=dict, mininterval=0.3)
for imgs, labels in pmgressbar_valid:
with torch.no_grad():
# 数据放到 gpu 设备上
imgs = imgs.to(device)
labels = labels.to(device)
# 获得网络的输出
preds = net(imgs)
# 获得精度和验证机损失
valid_l = loss(preds, labels)
valid_loss.append(valid_l.item())
acc = (preds.argmax(dim=-1)==labels).float().mean().item()
valid_acc.append(acc)
valid_loss_mean = sum(valid_loss) / len(valid_loss)
pmgressbar_valid.set_postfix(**{'valid_loss_mean' : valid_loss_mean,
'valid_acc_mean' : acc})
pmgressbar_valid.update()
valid_loss_mean = sum(valid_loss) / len(valid_loss)
valid_acc_mean = sum(valid_acc) / len(valid_acc)
print(f'the {epoch+1} epoch, loss = {valid_loss_mean:.5f}, valid_acc = {valid_acc_mean:.5f}')
pmgressbar_valid.close()
# 所有epoch全部跑完, 保存权重
path = "../data/" + datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + '_renet_params.pth'
torch.save({'model': net.state_dict()}, path)
print('Training completed!!')
# 定义测试函数
def test_Cifar10(net, device, test_iter, file):
net.eval()
all_preds_labels = []
for imgs, _ in tqdm(test_iter):
# 送入网络获得预测输出
preds = net(imgs)
# 对预测的最后一个维度计算最大值的位置
preds_labels = preds.argmax(dim=-1).cpu.numpy().tolist()
# 记录下所有的预测的标签
all_preds_labels.append([num_to_class(preds_label) for preds_label in preds_labels ])
print('all test img numbers is :', len(all_preds_labels))
return all_preds_labels
# 获取当前的GPU, 没有的话使用CPU
def get_device():
return 'cuda' if torch.cuda.is_available() else 'cpu'
train_Cifar10
: 是主训练函数,其中 torch.optim.lr_scheduler.StepLR
是一种阶梯性的学习率下降公式,每经过 lr_period 个 epoch,学习率 lr=lr*lr_decay。
test_Cifar10
:是测试函数,输入数据集,获取网络预测结果,再将预测的结果转换为 string 类型,写入对应的 csv 文件中。
网络训练
# 开始建立训练参数,并开始训练
# 获取训练的设备
device = get_device()
# 获取网络模型
train_net = get_resnet(10, num_block=18, is_pretrained=True)
train_net = train_net.to(device)
# 训练的超参数
num_epochs, learning_rate, weight_decay, lr_period, lr_decay = 50, 2e-4, 5e-4, 4, 0.9
train_Cifar10(train_net, device, train_iter, valid_iter, num_epochs, learning_rate, weight_decay, lr_period, lr_decay)
训练的时候将会现实如下的训练进度
这是正常训练的结果,后面使用 test_Cifar10 函数将验证得到的数据写入 csv 文件即可。